Сайн уу. Тус курсын оюутнуудад зориулан бэлтгэсэн нийтлэлийн эцсийн хэсгийн орчуулгыг хүргэж байна.
Бодит цагийн дамжуулах хоолойд зориулсан Apache Beam ба DataFlow
Google Cloud-г тохируулж байна
Тайлбар: Би Python 3 дээр дамжуулах хоолойг ажиллуулахад асуудалтай байсан тул Google Cloud Shell-ийг ашиглаж, хэрэглэгчийн бүртгэлийн өгөгдлийг нийтлэх боломжтой. Google Cloud Shell нь Python 2-г ашигладаг бөгөөд энэ нь Apache Beam-тэй илүү нийцдэг.
Дамжуулах хоолойг эхлүүлэхийн тулд бид тохиргоонд бага зэрэг ухах хэрэгтэй. Өмнө нь GCP ашиглаж байгаагүй хүмүүсийн хувьд та үүнд дурдсан дараах 6 алхамыг дагах хэрэгтэй
Үүний дараа бид скриптүүдээ Google Cloud Storage-д байршуулж, Google Cloud Shel руу хуулах хэрэгтэй болно. Клоуд санд байршуулах нь маш энгийн зүйл юм (тайлбарыг олж болно
2 зураг
Файлуудыг хуулж, шаардлагатай сангуудыг суулгахад шаардлагатай командуудыг доор жагсаав.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Манай мэдээллийн сан болон хүснэгтийг бий болгож байна
Тохируулгатай холбоотой бүх алхмуудыг хийж дууссаны дараа бидний хийх дараагийн зүйл бол BigQuery дээр өгөгдлийн багц болон хүснэгт үүсгэх явдал юм. Үүнийг хийх хэд хэдэн арга байдаг ч хамгийн энгийн нь Google Cloud консолыг ашиглан эхлээд датасет үүсгэх явдал юм. Та доорх алхмуудыг дагаж болно
Зураг 3. Хүснэгтийн зохион байгуулалт
Хэрэглэгчийн бүртгэлийн өгөгдлийг нийтэлж байна
Pub/Sub нь олон бие даасан программууд хоорондоо холбогдох боломжийг олгодог тул манай дамжуулах хоолойн чухал бүрэлдэхүүн хэсэг юм. Ялангуяа энэ нь бидэнд программуудын хооронд мессеж илгээх, хүлээн авах боломжийг олгодог зуучлагчийн үүрэг гүйцэтгэдэг. Бидний хийх ёстой хамгийн эхний зүйл бол сэдэв үүсгэх явдал юм. Зүгээр л консол дахь Pub/Sub руу ороод СЭДВИЙГ ҮЗҮҮЛЭХ дээр дарна уу.
Доорх код нь дээр тодорхойлсон бүртгэлийн өгөгдлийг үүсгэхийн тулд манай скриптийг дуудаж, дараа нь логуудыг Pub/Sub руу холбож илгээдэг. Бидний хийх ёстой цорын ганц зүйл бол объект үүсгэх явдал юм PublisherClient, аргыг ашиглан сэдэвт хүрэх замыг зааж өгнө topic_path
мөн функцийг дуудна publish
с topic_path
болон өгөгдөл. Манайх импортолж байгааг анхаарна уу generate_log_line
манай скриптээс stream_logs
, тиймээс эдгээр файлууд нэг хавтсанд байгаа эсэхийг шалгаарай, эс тэгвээс импортын алдаа гарах болно. Дараа нь бид үүнийг Google консолоор дамжуулан ажиллуулж болно:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Файл ажиллаж эхэлмэгц бид доорх зурагт үзүүлсэн шиг консол руу лог өгөгдлийн гаралтыг харах боломжтой болно. Энэ скрипт нь бид ашиглахгүй бол ажиллах болно CTRL + Cдуусгахын тулд.
Зураг 4. Гаралт publish_logs.py
Бидний дамжуулах хоолойн кодыг бичиж байна
Одоо бид бүх зүйлээ бэлтгэсэн тул бид хөгжилтэй хэсгийг эхлүүлж болно - Beam болон Python ашиглан дамжуулах хоолойг кодлох. Beam дамжуулах хоолой үүсгэхийн тулд бид дамжуулах хоолойн объектыг (p) үүсгэх хэрэгтэй. Дамжуулах шугамын объектыг үүсгэсний дараа бид операторыг ашиглан олон функцийг ар араас нь ашиглаж болно pipe (|)
. Ерөнхийдөө ажлын явц нь доорх зураг шиг харагдаж байна.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Манай кодонд бид хоёр захиалгат функц үүсгэх болно. Чиг үүрэг regex_clean
, энэ нь өгөгдлийг сканнердаж, функцийг ашиглан PATTERNS жагсаалтад үндэслэн харгалзах мөрийг гаргаж авдаг. re.search
. Уг функц нь таслалаар тусгаарлагдсан мөрийг буцаана. Хэрэв та ердийн илэрхийлэлийн мэргэжилтэн биш бол би үүнийг шалгахыг зөвлөж байна datetime
Үүнийг ажиллуулахын тулд функц дотор. Би файлын эхэнд импортын алдаа гаргаж байсан нь хачирхалтай байсан. Дараа нь энэ жагсаалтыг функц руу шилжүүлнэ WriteToBigQuery, энэ нь зүгээр л бидний өгөгдлийг хүснэгтэд нэмдэг. Batch DataFlow Job болон Streaming DataFlow Job-ын кодыг доор өгөв. Багц болон урсгал код хоёрын цорын ганц ялгаа нь багц хэлбэрээр бид CSV-г уншдаг явдал юм src_path
функцийг ашиглан ReadFromText
Beam-аас.
Багц DataFlow ажил (багц боловсруулах)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Streaming DataFlow Job (stream processing)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Конвейерийг эхлүүлж байна
Бид дамжуулах хоолойг хэд хэдэн аргаар ажиллуулж болно. Хэрэв бид хүсвэл GCP руу алсаас нэвтэрч байхдаа терминалаас үүнийг дотооддоо ажиллуулж болно.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Гэсэн хэдий ч бид үүнийг DataFlow ашиглан ажиллуулах болно. Дараах шаардлагатай параметрүүдийг тохируулснаар бид доорх тушаалыг ашиглан үүнийг хийж болно.
project
— Таны GCP төслийн ID.runner
нь таны хөтөлбөрт дүн шинжилгээ хийж, дамжуулах хоолойг тань барих шугам хоолой юм. Үүлэн дээр ажиллахын тулд та DataflowRunner-г зааж өгөх ёстой.staging_location
— уг ажлыг гүйцэтгэж буй процессоруудад шаардлагатай кодын багцыг индексжүүлэх зориулалттай Cloud Dataflow үүл хадгалах сан руу хүрэх зам.temp_location
— Дамжуулах хоолой ажиллаж байх үед үүссэн түр зуурын ажлын файлуудыг хадгалах зориулалттай Cloud Dataflow үүл сан руу орох зам.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Энэ тушаал ажиллаж байх үед бид google консол дахь DataFlow таб руу орж, дамжуулах шугамаа харах боломжтой. Дамжуулах хоолой дээр дарахад бид Зураг 4-тэй төстэй зүйлийг харах болно. Дибаг хийх зорилгоор Logs, дараа нь Stackdriver руу очиж нарийвчилсан бүртгэлүүдийг үзэх нь маш тустай. Энэ нь надад хэд хэдэн тохиолдолд дамжуулах хоолойн асуудлыг шийдвэрлэхэд тусалсан.
Зураг 4: Цацраг дамжуулагч
BigQuery дээрх манай өгөгдөлд хандах
Тиймээс, бид хүснэгтэд өгөгдөл урсаж байгаа дамжуулах хоолойтой байх ёстой. Үүнийг шалгахын тулд бид BigQuery руу орж өгөгдлийг харах боломжтой. Доорх тушаалыг ашигласны дараа та өгөгдлийн багцын эхний хэдэн мөрийг харах ёстой. Одоо бид BigQuery-д хадгалагдсан өгөгдөлтэй болсон тул бид цаашдын дүн шинжилгээ хийх, мөн мэдээллийг хамтран ажиллагсадтайгаа хуваалцаж, бизнесийн асуултуудад хариулж эхлэх боломжтой.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Зураг 5: BigQuery
дүгнэлт
Энэ нийтлэл нь өгөгдлийн урсгалыг бий болгох, мөн өгөгдлийг илүү хүртээмжтэй болгох арга замыг олоход хэрэгтэй жишээ болно гэж найдаж байна. Энэ форматаар өгөгдлийг хадгалах нь бидэнд олон давуу талыг өгдөг. Одоо манай бүтээгдэхүүнийг хэр олон хүн ашигладаг вэ гэх мэт чухал асуултуудад хариулж эхлэх боломжтой. Таны хэрэглэгчийн бааз цаг хугацаа өнгөрөх тусам нэмэгдэж байна уу? Хүмүүс бүтээгдэхүүний ямар талуудтай хамгийн их харьцдаг вэ? Мөн байх ёсгүй алдаанууд байдаг уу? Эдгээр нь байгууллагын сонирхлыг татах асуултууд юм. Эдгээр асуултын хариултаас олж авсан ойлголт дээр үндэслэн бид бүтээгдэхүүнийг сайжруулж, хэрэглэгчийн оролцоог нэмэгдүүлэх боломжтой.
Beam нь энэ төрлийн дасгалд үнэхээр хэрэгтэй бөгөөд бусад олон сонирхолтой хэрэглээтэй. Жишээлбэл, та хувьцааны хачгийн мэдээллийг бодит цаг хугацаанд нь шинжилж, дүн шинжилгээнд үндэслэн арилжаа хийх, магадгүй танд тээврийн хэрэгслээс мэдрэгчийн өгөгдөл ирж байгаа бөгөөд замын хөдөлгөөний түвшинг тооцоолохыг хүсч болно. Жишээлбэл, та хэрэглэгчийн мэдээллийг цуглуулж, үндсэн хэмжигдэхүүнийг хянах самбар үүсгэхэд ашигладаг тоглоомын компани байж болно. За, ноёд оо, энэ бол өөр нийтлэлийн сэдэв, уншсанд баярлалаа, мөн кодыг бүрэн эхээр нь үзэхийг хүссэн хүмүүст миний GitHub-ийн линк доор байна.
Ердөө л тэр.
Эх сурвалж: www.habr.com