Biz verilənlərin axın emalı üçün boru xətti yaradırıq. 2-ci hissə

Hamıya salam. Kursun tələbələri üçün xüsusi hazırlanmış məqalənin yekun hissəsinin tərcüməsini paylaşırıq Məlumat Mühəndisi. Birinci hissəni tapmaq olar burada.

Real vaxt boru kəmərləri üçün Apache Beam və DataFlow

Biz verilənlərin axın emalı üçün boru xətti yaradırıq. 2-ci hissə

Google Bulud quraşdırılır

Qeyd: Python 3-də boru kəmərini idarə etməkdə çətinlik çəkdiyim üçün boru kəmərini idarə etmək və istifadəçi jurnalı məlumatlarını dərc etmək üçün Google Cloud Shell-dən istifadə etdim. Google Cloud Shell Apache Beam ilə daha uyğun olan Python 2-dən istifadə edir.

Boru kəmərini işə salmaq üçün parametrləri bir az qazmalıyıq. Daha əvvəl GCP-dən istifadə etməyənlər üçün bu işdə aşağıdakı 6 addımı tamamlamalısınız səhifə.

Bundan sonra biz skriptlərimizi Google Cloud Storage-ə yükləməli və onları Google Cloud Shel-ə köçürməliyik. Bulud yaddaşına yükləmə olduqca mənasızdır (təsviri tapa bilərsiniz burada). Fayllarımızı köçürmək üçün biz aşağıdakı Şəkil 2-də soldakı ilk işarəyə klikləməklə alətlər panelindən Google Cloud Shel-i aça bilərik.

Biz verilənlərin axın emalı üçün boru xətti yaradırıq. 2-ci hissə
Şəkil 2

Faylları kopyalamaq və lazımi kitabxanaları quraşdırmaq üçün bizə lazım olan əmrlər aşağıda verilmişdir.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Verilənlər bazamızın və cədvəlimizin yaradılması

Bütün quraşdırma addımlarını tamamladıqdan sonra etməli olduğumuz növbəti şey BigQuery-də verilənlər bazası və cədvəl yaratmaqdır. Bunu etməyin bir neçə yolu var, lakin ən sadəsi əvvəlcə verilənlər toplusu yaradaraq Google Bulud konsolundan istifadə etməkdir. Aşağıdakı addımları izləyə bilərsiniz əlaqəsxemi ilə cədvəl yaratmaq. Süfrəmiz olacaq 7 sütun, hər bir istifadəçi jurnalının komponentlərinə uyğundur. Rahatlıq üçün, timelocal dəyişən istisna olmaqla, bütün sütunları sətirlər (sətir tipli) kimi təyin edəcəyik və onları əvvəllər yaratdığımız dəyişənlərə uyğun adlandıracağıq. Cədvəlimizin tərtibatı Şəkil 3-ə bənzəməlidir.

Biz verilənlərin axın emalı üçün boru xətti yaradırıq. 2-ci hissə
Şəkil 3. Cədvəl sxemi

İstifadəçi jurnalının məlumatlarının dərci

Pub/Sub bizim boru kəmərimizin kritik komponentidir, çünki o, çoxsaylı müstəqil proqramların bir-biri ilə əlaqə saxlamasına imkan verir. Xüsusilə, o, proqramlar arasında mesaj göndərmək və qəbul etmək imkanı verən vasitəçi kimi işləyir. Etməli olduğumuz ilk şey mövzu (mövzu) yaratmaqdır. Sadəcə olaraq konsolda Pub/Sub-a keçin və MÖVZU YARAT düyməsini klikləyin.

Aşağıdakı kod yuxarıda müəyyən edilmiş log məlumatlarını yaratmaq üçün skriptimizi çağırır və sonra qeydləri Pub/Sub-a birləşdirərək göndərir. Etməli olduğumuz yeganə şey bir obyekt yaratmaqdır PublisherClient, metoddan istifadə edərək mövzu yolunu göstərin topic_path və funksiyanı çağırın publish с topic_path və məlumatlar. Nəzərə alın ki, biz idxal edirik generate_log_line skriptimizdən stream_logsbuna görə də həmin faylların eyni qovluqda olduğundan əmin olun, əks halda idxal xətası alacaqsınız. Daha sonra bunu istifadə edərək google konsolumuzda işlədə bilərik:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Fayl işə salındıqdan sonra, aşağıdakı şəkildə göstərildiyi kimi, log məlumatlarının konsola çıxışını müşahidə edə biləcəyik. Biz istifadə etmədiyimiz müddətdə bu skript işləyəcək CTRL + Ctamamlamaq üçün.

Biz verilənlərin axın emalı üçün boru xətti yaradırıq. 2-ci hissə
Şəkil 4. Nəticə publish_logs.py

Boru Kəməri Kodumuzu Yazırıq

İndi hər şeyi qurduq, biz əyləncəli hissəyə keçə bilərik - Beam və Python istifadə edərək boru kəmərimizin kodlaşdırılması. Beam boru kəməri yaratmaq üçün bir boru kəməri obyekti yaratmalıyıq (p). Bir boru kəməri obyekti yaratdıqdan sonra operatordan istifadə edərək bir-birinin ardınca bir neçə funksiya tətbiq edə bilərik pipe (|). Ümumiyyətlə, iş prosesi aşağıdakı şəkildəki kimi görünür.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Kodumuzda istifadəçi tərəfindən təyin edilmiş iki funksiya yaradacağıq. Funksiya regex_clean, məlumatı skan edir və funksiyadan istifadə edərək PATTERNS siyahısına əsasən uyğun sətri çıxarır re.search. Funksiya vergüllə ayrılmış sətir qaytarır. Əgər müntəzəm ifadələr üzrə mütəxəssis deyilsinizsə, bunu yoxlamağı məsləhət görürəm. dərslik və kodu sınamaq üçün notepadda məşq edin. Sonra adlanan xüsusi ParDo funksiyasını təyin edirik Split, bu paralel emal üçün Beam transformasiyasının variasiyasıdır. Python-da bu, xüsusi üsulla həyata keçirilir - biz DoFn Beam sinfindən miras qalan sinif yaratmalıyıq. Split funksiyası təhlil edilmiş sətri əvvəlki funksiyadan götürür və BigQuery cədvəlimizdəki sütun adlarına uyğun düymələri olan lüğətlərin siyahısını qaytarır. Bu funksiya haqqında qeyd etmək lazım olan bir şey var: idxal etməli oldum datetime işləməsi üçün funksiya daxilində. Faylın əvvəlində idxal zamanı xəta alırdım, bu qəribə idi. Bu siyahı daha sonra funksiyaya ötürülür WriteToBigQuery, bu sadəcə məlumatlarımızı cədvələ əlavə edir. Batch DataFlow Job və Streaming DataFlow Job kodu aşağıda verilmişdir. Toplu və axın kodu arasındakı yeganə fərq, topluda CSV-ni oxumağımızdır src_pathfunksiyasından istifadə etməklə ReadFromText Beam-dən.

Batch DataFlow İşi (toplu işlənmə)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Axın DataFlow İşi (axın emal)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Boru kəmərinin işə salınması

Boru kəmərini bir neçə fərqli yolla başlaya bilərik. İstəsək, uzaqdan GCP-yə daxil olmaqla onu terminaldan yerli olaraq işlədə bilərdik.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Bununla belə, biz onu DataFlow ilə işləyəcəyik. Bunu aşağıdakı tələb olunan parametrləri təyin etməklə aşağıdakı əmrlə edə bilərik.

  • project - GCP layihənizin ID-si.
  • runner proqramınızı təhlil edəcək və boru kəmərinizi quracaq bir boru kəməridir. Buludda işləmək üçün DataflowRunner təyin etməlisiniz.
  • staging_location işi yerinə yetirən işləyicilərə lazım olan kod paketlərinin indeksləşdirilməsi üçün Cloud Dataflow bulud yaddaşına gedən yoldur.
  • temp_location - Boru kəmərinin istismarı zamanı yaradılmış müvəqqəti iş fayllarının yerləşdirilməsi üçün Cloud Dataflow bulud yaddaşına gedən yol.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Bu əmr işləyərkən biz google konsolunda DataFlow sekmesine keçib boru xəttimizə baxa bilərik. Boru kəmərinə klikləməklə, biz Şəkil 4-ə bənzər bir şey görməliyik. Sazlama məqsədləri üçün ətraflı jurnalları görmək üçün jurnallara, sonra isə Stackdriver-ə getmək çox faydalı ola bilər. Bu, mənə bir sıra hallarda boru kəməri ilə bağlı problemləri həll etməyə kömək etdi.

Biz verilənlərin axın emalı üçün boru xətti yaradırıq. 2-ci hissə
Şəkil 4: Şüa boru kəməri

BigQuery-də məlumatlarımıza daxil olmaq

Beləliklə, cədvəlimizə daxil olan məlumatlarla işləyən bir boru kəmərimiz olmalıdır. Bunu yoxlamaq üçün BigQuery-ə keçib dataya baxa bilərik. Aşağıdakı əmrdən istifadə etdikdən sonra məlumat dəstinin ilk bir neçə sətirini görməlisiniz. Artıq BigQuery-də saxlanılan dataya sahib olduğumuz üçün biz əlavə təhlillər apara, həmçinin məlumatları həmkarlarımızla paylaşa və biznes suallarını cavablandırmağa başlaya bilərik.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Biz verilənlərin axın emalı üçün boru xətti yaradırıq. 2-ci hissə
Şəkil 5: BigQuery

Nəticə

Ümid edirik ki, bu yazı axın məlumat kəmərinin qurulması, eləcə də məlumatları daha əlçatan etmək yollarını tapmaq üçün faydalı bir nümunə olacaq. Məlumatların bu formatda saxlanması bizə bir çox üstünlüklər verir. İndi məhsulumuzdan neçə nəfər istifadə edir kimi vacib suallara cavab verməyə başlaya bilərik? İstifadəçi bazası zaman keçdikcə artır? İnsanlar məhsulun hansı aspektləri ilə daha çox əlaqə qururlar? Və olmamalı olduğu yerlərdə səhvlər varmı? Bunlar təşkilatı maraqlandıracaq suallardır. Bu suallara verilən cavablardan əldə edilən fikirlərə əsaslanaraq, biz məhsulu təkmilləşdirə və istifadəçinin əlaqəsini artıra biləcəyik.

Beam bu cür məşq üçün həqiqətən faydalıdır və bir sıra digər maraqlı istifadə hallarına da malikdir. Məsələn, siz real vaxt rejimində səhm gənə məlumatlarını təhlil edə və təhlil əsasında ticarət edə bilərsiniz, bəlkə də nəqliyyat vasitələrindən gələn sensor məlumatlarınız var və trafik səviyyəsinin hesablanmasını hesablamaq istəyə bilərsiniz. Siz həmçinin, məsələn, istifadəçi məlumatlarını toplayan və əsas ölçüləri izləmək üçün idarə panelləri yaratmaq üçün istifadə edən bir oyun şirkəti ola bilərsiniz. Yaxşı cənablar, bu başqa yazı üçün mövzudur, oxuduğunuz üçün təşəkkürlər və tam kodu görmək istəyənlər üçün aşağıda mənim GitHub-a keçid var.

https://github.com/DFoly/User_log_pipeline

Bütün bunlar. Birinci hissəni oxuyun.

Mənbə: www.habr.com

Добавить комментарий