Biz oqim ma'lumotlarini qayta ishlash quvur liniyasini yaratamiz. 2-qism

Hammaga salom. Kurs talabalari uchun maxsus tayyorlangan maqolaning yakuniy qismi tarjimasini baham ko'ramiz. Ma'lumotlar muhandisi. Birinchi qismni o'qishingiz mumkin shu yerda.

Haqiqiy vaqtda quvurlar uchun Apache Beam va DataFlow

Biz oqim ma'lumotlarini qayta ishlash quvur liniyasini yaratamiz. 2-qism

Google Cloud sozlanmoqda

Eslatma: Men Google Cloud Shell’dan quvur liniyasini ishga tushirish va maxsus jurnal ma’lumotlarini nashr qilish uchun foydalanardim, chunki Python 3’da quvur liniyasini ishga tushirishda muammoga duch keldim. Google Cloud Shell Python 2’dan foydalanadi, bu Apache Beam’ga ko‘proq mos keladi.

Quvurni ishga tushirish uchun biz sozlamalarni biroz qazishimiz kerak. Ilgari GCP dan foydalanmaganlar uchun bunda ko'rsatilgan quyidagi 6 qadamni bajarishingiz kerak bo'ladi sahifa.

Shundan so'ng biz skriptlarimizni Google Cloud Storage-ga yuklashimiz va ularni Google Cloud Shel-ga nusxalashimiz kerak. Bulutli xotiraga yuklash juda ahamiyatsiz (tavsifni topish mumkin shu yerda). Fayllarimizdan nusxa olish uchun biz Google Cloud Shel-ni asboblar panelidan quyidagi 2-rasmdagi chapdagi birinchi belgini bosish orqali ochishimiz mumkin.

Biz oqim ma'lumotlarini qayta ishlash quvur liniyasini yaratamiz. 2-qism
Shakl 2

Fayllarni nusxalash va kerakli kutubxonalarni o'rnatishimiz kerak bo'lgan buyruqlar quyida keltirilgan.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Ma'lumotlar bazasi va jadvalimizni yaratish

O'rnatish bilan bog'liq barcha bosqichlarni bajarganimizdan so'ng, biz qilishimiz kerak bo'lgan navbatdagi narsa BigQuery-da ma'lumotlar to'plami va jadval yaratishdir. Buning bir necha yo'li bor, lekin eng oddiyi, avval ma'lumotlar to'plamini yaratish orqali Google Cloud konsolidan foydalanish. Quyidagi amallarni bajarishingiz mumkin aloqasxema bilan jadval yaratish uchun. Bizning stolimiz bo'ladi 7 ta ustun, har bir foydalanuvchi jurnalining komponentlariga mos keladi. Qulaylik uchun biz barcha ustunlarni satr sifatida belgilaymiz, timelocal o'zgaruvchidan tashqari va ularni avval yaratilgan o'zgaruvchilarga ko'ra nomlaymiz. Jadvalimizning tartibi 3-rasmdagi kabi bo'lishi kerak.

Biz oqim ma'lumotlarini qayta ishlash quvur liniyasini yaratamiz. 2-qism
Rasm 3. Jadvalning joylashuvi

Foydalanuvchi jurnali ma'lumotlarini nashr qilish

Pub/Sub bizning quvur liniyasining muhim tarkibiy qismidir, chunki u bir nechta mustaqil dasturlarning bir-biri bilan aloqa qilishiga imkon beradi. Xususan, u bizga ilovalar o'rtasida xabar yuborish va qabul qilish imkonini beruvchi vositachi sifatida ishlaydi. Biz qilishimiz kerak bo'lgan birinchi narsa - mavzu yaratish. Shunchaki konsoldagi Pub/Sub-ga o'ting va MAVZU YARATISH tugmasini bosing.

Quyidagi kod yuqorida belgilangan jurnal ma'lumotlarini yaratish uchun skriptimizni chaqiradi va keyin ulanadi va jurnallarni Pub/Sub-ga yuboradi. Biz qilishimiz kerak bo'lgan yagona narsa - ob'ekt yaratish PublisherClient, usul yordamida mavzuga yo'lni belgilang topic_path va funksiyani chaqiring publish с topic_path va ma'lumotlar. E'tibor bering, biz import qilamiz generate_log_line bizning skriptimizdan stream_logs, shuning uchun bu fayllar bir papkada ekanligiga ishonch hosil qiling, aks holda siz import xatosiga duch kelasiz. Keyin buni Google konsolimiz orqali ishga tushirishimiz mumkin:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Fayl ishga tushishi bilan biz quyidagi rasmda ko'rsatilganidek, jurnal ma'lumotlarining konsolga chiqishini ko'rishimiz mumkin. Bu skript biz foydalanmagunimizcha ishlaydi CTRL + Cuni bajarish uchun.

Biz oqim ma'lumotlarini qayta ishlash quvur liniyasini yaratamiz. 2-qism
4-rasm. Chiqish publish_logs.py

Bizning quvur liniyasi kodini yozish

Endi bizda hamma narsa tayyor, biz qiziqarli qismni - Beam va Python yordamida quvur liniyasini kodlashni boshlashimiz mumkin. Beam quvur liniyasini yaratish uchun biz quvur liniyasi ob'ektini (p) yaratishimiz kerak. Biz quvur liniyasi ob'ektini yaratganimizdan so'ng, operator yordamida bir nechta funktsiyalarni birin-ketin qo'llashimiz mumkin pipe (|). Umuman olganda, ish jarayoni quyidagi rasmga o'xshaydi.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Bizning kodimizda biz ikkita maxsus funktsiyani yaratamiz. Funktsiya regex_clean, bu funksiya yordamida ma'lumotlarni skanerlaydi va PATTERNS ro'yxati asosida mos keladigan qatorni oladi re.search. Funktsiya vergul bilan ajratilgan qatorni qaytaradi. Agar siz oddiy ifoda mutaxassisi bo'lmasangiz, buni tekshirishni tavsiya etaman darslik va kodni tekshirish uchun bloknotda mashq qiling. Shundan so'ng biz ParDo deb nomlangan maxsus funksiyani aniqlaymiz Split, bu parallel ishlov berish uchun Beam konvertatsiyasining o'zgarishi. Pythonda bu maxsus usulda amalga oshiriladi - biz DoFn Beam sinfidan meros bo'lgan sinf yaratishimiz kerak. Split funksiyasi oldingi funksiyadan ajratilgan qatorni oladi va BigQuery jadvalimizdagi ustun nomlariga mos keladigan tugmachalar bilan lug‘atlar ro‘yxatini qaytaradi. Ushbu funktsiyaga e'tibor berish kerak bo'lgan narsa bor: import qilishim kerak edi datetime uni ishlashi uchun funktsiya ichida. Fayl boshida import xatosi oldim, bu g'alati edi. Keyin bu ro'yxat funksiyaga o'tkaziladi WriteToBigQuery, bu bizning ma'lumotlarimizni jadvalga qo'shadi. Batch DataFlow Job va Streaming DataFlow Job uchun kod quyida keltirilgan. Ommaviy va oqimli kod o'rtasidagi yagona farq shundaki, biz to'plamda CSV ni o'qiymiz src_pathfunksiyasidan foydalanish ReadFromText Beamdan.

Batch DataFlow Job (paketni qayta ishlash)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (oqimli ishlov berish)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Konveyerni ishga tushirish

Biz quvur liniyasini turli yo'llar bilan ishlatishimiz mumkin. Agar xohlasak, GCP-ga masofadan kirish paytida uni terminaldan mahalliy sifatida ishga tushirishimiz mumkin edi.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Biroq, biz uni DataFlow yordamida ishga tushiramiz. Buni quyidagi kerakli parametrlarni o'rnatish orqali quyidagi buyruq yordamida amalga oshirishimiz mumkin.

  • project — GCP loyihangiz identifikatori.
  • runner dasturingizni tahlil qiladigan va quvur liniyasini quradigan quvur liniyasi. Bulutda ishlash uchun DataflowRunner-ni belgilashingiz kerak.
  • staging_location — ishni bajarayotgan protsessorlar uchun zarur boʻlgan kod paketlarini indekslash uchun Cloud Dataflow bulutli xotirasiga yoʻl.
  • temp_location — quvur liniyasi ishlayotgan vaqtda yaratilgan vaqtinchalik ish fayllarini saqlash uchun Cloud Dataflow bulutli xotirasiga yoʻl.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Ushbu buyruq ishlayotganda, biz Google konsolidagi DataFlow yorlig'iga o'tishimiz va quvurimizni ko'rishimiz mumkin. Quvur liniyasini bosganimizda, biz 4-rasmga o'xshash narsani ko'rishimiz kerak. Nosozliklarni tuzatish maqsadida, batafsil jurnallarni ko'rish uchun Logs va keyin Stackdriver-ga o'tish juda foydali bo'lishi mumkin. Bu menga bir qator holatlarda quvur liniyasi bilan bog'liq muammolarni hal qilishda yordam berdi.

Biz oqim ma'lumotlarini qayta ishlash quvur liniyasini yaratamiz. 2-qism
4-rasm: nurli konveyer

BigQuery maʼlumotlarimizga kirish

Shunday qilib, biz allaqachon jadvalimizga ma'lumotlar oqimi bilan ishlaydigan quvurga ega bo'lishimiz kerak. Buni sinab ko'rish uchun biz BigQuery-ga o'tamiz va ma'lumotlarni ko'rib chiqamiz. Quyidagi buyruqdan foydalangandan so'ng siz ma'lumotlar to'plamining birinchi qatorlarini ko'rishingiz kerak. Endi biz BigQuery-da saqlangan ma'lumotlarga egamiz, biz keyingi tahlillarni o'tkazishimiz, shuningdek, ma'lumotlarni hamkasblar bilan baham ko'rishimiz va biznes savollariga javob berishni boshlashimiz mumkin.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Biz oqim ma'lumotlarini qayta ishlash quvur liniyasini yaratamiz. 2-qism
5-rasm: BigQuery

xulosa

Umid qilamizki, ushbu post ma'lumotlar oqimini yaratish, shuningdek, ma'lumotlarga kirishni yanada qulayroq qilish yo'llarini topishning foydali namunasi bo'ladi. Ushbu formatda ma'lumotlarni saqlash bizga ko'p afzalliklarni beradi. Endi mahsulotimizdan qancha odam foydalanishi kabi muhim savollarga javob berishni boshlashimiz mumkin. Vaqt o'tishi bilan foydalanuvchi bazangiz o'sib bormoqdami? Odamlar mahsulotning qaysi jihatlari bilan ko'proq aloqada bo'lishadi? Va bo'lmasligi kerak bo'lgan xatolar bormi? Bular tashkilotni qiziqtiradigan savollar. Ushbu savollarga javoblardan kelib chiqadigan tushunchalarga asoslanib, biz mahsulotni yaxshilashimiz va foydalanuvchilarning faolligini oshirishimiz mumkin.

Beam ushbu turdagi mashqlar uchun juda foydali va boshqa bir qator qiziqarli foydalanish holatlariga ham ega. Misol uchun, siz real vaqt rejimida birja ma'lumotlarini tahlil qilishni va tahlillar asosida savdolarni amalga oshirishni xohlashingiz mumkin, ehtimol sizda transport vositalaridan kelgan sensor ma'lumotlari bor va trafik darajasini hisoblashni xohlaysiz. Shuningdek, siz, masalan, foydalanuvchi ma'lumotlarini to'playdigan va undan asosiy ko'rsatkichlarni kuzatish uchun asboblar paneli yaratish uchun foydalanadigan o'yin kompaniyasi bo'lishingiz mumkin. Mayli, janoblar, bu boshqa post uchun mavzu, o'qiganingiz uchun rahmat va to'liq kodni ko'rishni istaganlar uchun quyida mening GitHub havolasi mavjud.

https://github.com/DFoly/User_log_pipeline

Hammasi shu. Birinchi qismni o'qing.

Manba: www.habr.com

a Izoh qo'shish