Hammaga salom. Kurs talabalari uchun maxsus tayyorlangan maqolaning yakuniy qismi tarjimasini baham ko'ramiz.
Haqiqiy vaqtda quvurlar uchun Apache Beam va DataFlow
Google Cloud sozlanmoqda
Eslatma: Men Google Cloud Shell’dan quvur liniyasini ishga tushirish va maxsus jurnal ma’lumotlarini nashr qilish uchun foydalanardim, chunki Python 3’da quvur liniyasini ishga tushirishda muammoga duch keldim. Google Cloud Shell Python 2’dan foydalanadi, bu Apache Beam’ga ko‘proq mos keladi.
Quvurni ishga tushirish uchun biz sozlamalarni biroz qazishimiz kerak. Ilgari GCP dan foydalanmaganlar uchun bunda ko'rsatilgan quyidagi 6 qadamni bajarishingiz kerak bo'ladi
Shundan so'ng biz skriptlarimizni Google Cloud Storage-ga yuklashimiz va ularni Google Cloud Shel-ga nusxalashimiz kerak. Bulutli xotiraga yuklash juda ahamiyatsiz (tavsifni topish mumkin
Shakl 2
Fayllarni nusxalash va kerakli kutubxonalarni o'rnatishimiz kerak bo'lgan buyruqlar quyida keltirilgan.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Ma'lumotlar bazasi va jadvalimizni yaratish
O'rnatish bilan bog'liq barcha bosqichlarni bajarganimizdan so'ng, biz qilishimiz kerak bo'lgan navbatdagi narsa BigQuery-da ma'lumotlar to'plami va jadval yaratishdir. Buning bir necha yo'li bor, lekin eng oddiyi, avval ma'lumotlar to'plamini yaratish orqali Google Cloud konsolidan foydalanish. Quyidagi amallarni bajarishingiz mumkin
Rasm 3. Jadvalning joylashuvi
Foydalanuvchi jurnali ma'lumotlarini nashr qilish
Pub/Sub bizning quvur liniyasining muhim tarkibiy qismidir, chunki u bir nechta mustaqil dasturlarning bir-biri bilan aloqa qilishiga imkon beradi. Xususan, u bizga ilovalar o'rtasida xabar yuborish va qabul qilish imkonini beruvchi vositachi sifatida ishlaydi. Biz qilishimiz kerak bo'lgan birinchi narsa - mavzu yaratish. Shunchaki konsoldagi Pub/Sub-ga o'ting va MAVZU YARATISH tugmasini bosing.
Quyidagi kod yuqorida belgilangan jurnal ma'lumotlarini yaratish uchun skriptimizni chaqiradi va keyin ulanadi va jurnallarni Pub/Sub-ga yuboradi. Biz qilishimiz kerak bo'lgan yagona narsa - ob'ekt yaratish PublisherClient, usul yordamida mavzuga yo'lni belgilang topic_path
va funksiyani chaqiring publish
с topic_path
va ma'lumotlar. E'tibor bering, biz import qilamiz generate_log_line
bizning skriptimizdan stream_logs
, shuning uchun bu fayllar bir papkada ekanligiga ishonch hosil qiling, aks holda siz import xatosiga duch kelasiz. Keyin buni Google konsolimiz orqali ishga tushirishimiz mumkin:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Fayl ishga tushishi bilan biz quyidagi rasmda ko'rsatilganidek, jurnal ma'lumotlarining konsolga chiqishini ko'rishimiz mumkin. Bu skript biz foydalanmagunimizcha ishlaydi CTRL + Cuni bajarish uchun.
4-rasm. Chiqish publish_logs.py
Bizning quvur liniyasi kodini yozish
Endi bizda hamma narsa tayyor, biz qiziqarli qismni - Beam va Python yordamida quvur liniyasini kodlashni boshlashimiz mumkin. Beam quvur liniyasini yaratish uchun biz quvur liniyasi ob'ektini (p) yaratishimiz kerak. Biz quvur liniyasi ob'ektini yaratganimizdan so'ng, operator yordamida bir nechta funktsiyalarni birin-ketin qo'llashimiz mumkin pipe (|)
. Umuman olganda, ish jarayoni quyidagi rasmga o'xshaydi.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Bizning kodimizda biz ikkita maxsus funktsiyani yaratamiz. Funktsiya regex_clean
, bu funksiya yordamida ma'lumotlarni skanerlaydi va PATTERNS ro'yxati asosida mos keladigan qatorni oladi re.search
. Funktsiya vergul bilan ajratilgan qatorni qaytaradi. Agar siz oddiy ifoda mutaxassisi bo'lmasangiz, buni tekshirishni tavsiya etaman datetime
uni ishlashi uchun funktsiya ichida. Fayl boshida import xatosi oldim, bu g'alati edi. Keyin bu ro'yxat funksiyaga o'tkaziladi WriteToBigQuery, bu bizning ma'lumotlarimizni jadvalga qo'shadi. Batch DataFlow Job va Streaming DataFlow Job uchun kod quyida keltirilgan. Ommaviy va oqimli kod o'rtasidagi yagona farq shundaki, biz to'plamda CSV ni o'qiymiz src_path
funksiyasidan foydalanish ReadFromText
Beamdan.
Batch DataFlow Job (paketni qayta ishlash)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Streaming DataFlow Job (oqimli ishlov berish)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Konveyerni ishga tushirish
Biz quvur liniyasini turli yo'llar bilan ishlatishimiz mumkin. Agar xohlasak, GCP-ga masofadan kirish paytida uni terminaldan mahalliy sifatida ishga tushirishimiz mumkin edi.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Biroq, biz uni DataFlow yordamida ishga tushiramiz. Buni quyidagi kerakli parametrlarni o'rnatish orqali quyidagi buyruq yordamida amalga oshirishimiz mumkin.
project
— GCP loyihangiz identifikatori.runner
dasturingizni tahlil qiladigan va quvur liniyasini quradigan quvur liniyasi. Bulutda ishlash uchun DataflowRunner-ni belgilashingiz kerak.staging_location
— ishni bajarayotgan protsessorlar uchun zarur boʻlgan kod paketlarini indekslash uchun Cloud Dataflow bulutli xotirasiga yoʻl.temp_location
— quvur liniyasi ishlayotgan vaqtda yaratilgan vaqtinchalik ish fayllarini saqlash uchun Cloud Dataflow bulutli xotirasiga yoʻl.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Ushbu buyruq ishlayotganda, biz Google konsolidagi DataFlow yorlig'iga o'tishimiz va quvurimizni ko'rishimiz mumkin. Quvur liniyasini bosganimizda, biz 4-rasmga o'xshash narsani ko'rishimiz kerak. Nosozliklarni tuzatish maqsadida, batafsil jurnallarni ko'rish uchun Logs va keyin Stackdriver-ga o'tish juda foydali bo'lishi mumkin. Bu menga bir qator holatlarda quvur liniyasi bilan bog'liq muammolarni hal qilishda yordam berdi.
4-rasm: nurli konveyer
BigQuery maʼlumotlarimizga kirish
Shunday qilib, biz allaqachon jadvalimizga ma'lumotlar oqimi bilan ishlaydigan quvurga ega bo'lishimiz kerak. Buni sinab ko'rish uchun biz BigQuery-ga o'tamiz va ma'lumotlarni ko'rib chiqamiz. Quyidagi buyruqdan foydalangandan so'ng siz ma'lumotlar to'plamining birinchi qatorlarini ko'rishingiz kerak. Endi biz BigQuery-da saqlangan ma'lumotlarga egamiz, biz keyingi tahlillarni o'tkazishimiz, shuningdek, ma'lumotlarni hamkasblar bilan baham ko'rishimiz va biznes savollariga javob berishni boshlashimiz mumkin.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
5-rasm: BigQuery
xulosa
Umid qilamizki, ushbu post ma'lumotlar oqimini yaratish, shuningdek, ma'lumotlarga kirishni yanada qulayroq qilish yo'llarini topishning foydali namunasi bo'ladi. Ushbu formatda ma'lumotlarni saqlash bizga ko'p afzalliklarni beradi. Endi mahsulotimizdan qancha odam foydalanishi kabi muhim savollarga javob berishni boshlashimiz mumkin. Vaqt o'tishi bilan foydalanuvchi bazangiz o'sib bormoqdami? Odamlar mahsulotning qaysi jihatlari bilan ko'proq aloqada bo'lishadi? Va bo'lmasligi kerak bo'lgan xatolar bormi? Bular tashkilotni qiziqtiradigan savollar. Ushbu savollarga javoblardan kelib chiqadigan tushunchalarga asoslanib, biz mahsulotni yaxshilashimiz va foydalanuvchilarning faolligini oshirishimiz mumkin.
Beam ushbu turdagi mashqlar uchun juda foydali va boshqa bir qator qiziqarli foydalanish holatlariga ham ega. Misol uchun, siz real vaqt rejimida birja ma'lumotlarini tahlil qilishni va tahlillar asosida savdolarni amalga oshirishni xohlashingiz mumkin, ehtimol sizda transport vositalaridan kelgan sensor ma'lumotlari bor va trafik darajasini hisoblashni xohlaysiz. Shuningdek, siz, masalan, foydalanuvchi ma'lumotlarini to'playdigan va undan asosiy ko'rsatkichlarni kuzatish uchun asboblar paneli yaratish uchun foydalanadigan o'yin kompaniyasi bo'lishingiz mumkin. Mayli, janoblar, bu boshqa post uchun mavzu, o'qiganingiz uchun rahmat va to'liq kodni ko'rishni istaganlar uchun quyida mening GitHub havolasi mavjud.
Hammasi shu.
Manba: www.habr.com