Muna ƙirƙirar bututun sarrafa bayanan rafi. Kashi na 2

Assalamu alaikum. Muna raba fassarar sashin ƙarshe na labarin, wanda aka shirya musamman don ɗaliban kwas. Injiniya Data. Kuna iya karanta kashi na farko a nan.

Apache Beam da DataFlow don Bututun Lokaci na Gaskiya

Muna ƙirƙirar bututun sarrafa bayanan rafi. Kashi na 2

Kafa Google Cloud

Lura: Na yi amfani da Google Cloud Shell don tafiyar da bututun kuma na buga bayanan log na al'ada saboda ina fuskantar matsala wajen tafiyar da bututun a Python 3. Google Cloud Shell yana amfani da Python 2, wanda ya fi dacewa da Apache Beam.

Don fara bututun, muna buƙatar tono kaɗan a cikin saitunan. Ga wadanda daga cikinku da ba su yi amfani da GCP a da ba, kuna buƙatar bin matakai 6 masu zuwa da aka zayyana a cikin wannan page.

Bayan wannan, za mu buƙaci loda rubutun mu zuwa Google Cloud Storage kuma mu kwafa su zuwa Google Cloud Shel. Lodawa zuwa ma'ajiyar gajimare ba karamin abu bane (ana iya samun kwatance a nan). Don kwafin fayilolinmu, za mu iya buɗe Google Cloud Shel daga ma'aunin kayan aiki ta danna gunkin farko a gefen hagu a hoto na 2 a ƙasa.

Muna ƙirƙirar bututun sarrafa bayanan rafi. Kashi na 2
2 zane

Umurnin da muke buƙatar kwafin fayilolin da shigar da ɗakunan karatu da ake buƙata an jera su a ƙasa.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Ƙirƙirar bayanan mu da tebur

Da zarar mun kammala duk matakan da suka danganci saitin, abu na gaba da muke buƙatar yi shine ƙirƙirar saitin bayanai da tebur a cikin BigQuery. Akwai hanyoyi da yawa don yin wannan, amma mafi sauƙi shine a yi amfani da na'ura mai kwakwalwa ta Google Cloud ta hanyar ƙirƙirar bayanan farko. Kuna iya bin matakan da ke ƙasa mahadadon ƙirƙirar tebur tare da tsari. Teburin mu zai kasance 7 rukunoni, daidai da sassan kowane log ɗin mai amfani. Don dacewa, za mu ayyana duk ginshiƙai azaman kirtani, ban da canjin lokaci na lokaci, kuma mu sanya su suna bisa ga masu canji da muka ƙirƙira a baya. Tsarin teburin mu yakamata yayi kama da hoto na 3.

Muna ƙirƙirar bututun sarrafa bayanan rafi. Kashi na 2
Hoto 3. Tsarin tebur

Buga bayanan log ɗin mai amfani

Pub/Sub muhimmin abu ne na bututunmu saboda yana ba da damar aikace-aikace masu zaman kansu da yawa don sadarwa tare da juna. Musamman ma, yana aiki azaman tsaka-tsaki wanda ke ba mu damar aikawa da karɓar saƙonni tsakanin aikace-aikace. Abu na farko da ya kamata mu yi shi ne ƙirƙirar batu. Kawai je zuwa Pub/Sub a cikin na'ura wasan bidiyo kuma danna CREATE TOPIC.

Lambar da ke ƙasa tana kiran rubutun mu don samar da bayanan log ɗin da aka ayyana a sama sannan kuma ya haɗa kuma aika rajistan ayyukan zuwa Pub/Sub. Abinda kawai muke bukata shine ƙirƙirar abu Mawallafi Abokin ciniki, ƙayyade hanyar zuwa batun ta amfani da hanyar topic_path kuma kira aikin publish с topic_path da data. Lura cewa muna shigo da kaya generate_log_line daga rubutun mu stream_logs, don haka tabbatar da cewa waɗannan fayilolin suna cikin babban fayil guda, in ba haka ba za ku sami kuskuren shigo da kaya. Za mu iya gudanar da wannan ta hanyar google console ta amfani da:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Da zaran fayil ɗin ya gudana, za mu iya ganin fitar da bayanan log ɗin zuwa na'ura mai kwakwalwa, kamar yadda aka nuna a hoton da ke ƙasa. Wannan rubutun zai yi aiki muddin ba mu yi amfani da shi ba Ctrl + Cdon kammala shi.

Muna ƙirƙirar bututun sarrafa bayanan rafi. Kashi na 2
Hoto 4. Fitowa publish_logs.py

Rubuta lambar bututunmu

Yanzu da muka shirya komai, za mu iya fara ɓangaren nishaɗi - yin rikodin bututunmu ta amfani da Beam da Python. Don ƙirƙirar bututun Beam, muna buƙatar ƙirƙirar bututun abu (p). Da zarar mun ƙirƙiri bututun abu, za mu iya amfani da ayyuka da yawa ɗaya bayan ɗaya ta amfani da mai aiki pipe (|). Gabaɗaya, aikin aiki yana kama da hoton da ke ƙasa.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

A cikin lambar mu, za mu ƙirƙiri ayyuka na al'ada guda biyu. Aiki regex_clean, wanda ke duba bayanan kuma ya dawo da layin da ya dace bisa jerin PATTERNS ta amfani da aikin. re.search. Aikin yana dawo da igiyar waƙafi. Idan ba ƙwararren magana ba ne na yau da kullun, Ina ba da shawarar duba wannan koyawa kuma yi aiki a cikin faifan rubutu don bincika lambar. Bayan haka muna ayyana aikin ParDo na al'ada da ake kira raba, wanda shine bambancin canjin Beam don aiki a layi daya. A cikin Python, ana yin wannan ta hanya ta musamman - dole ne mu ƙirƙiri aji wanda ya gaji daga ajin DoFn Beam. Aikin Rarraba yana ɗaukar layin da aka fake daga aikin da ya gabata kuma yana dawo da jerin ƙamus tare da maɓallai masu dacewa da sunayen ginshiƙai a teburin mu na BigQuery. Akwai abin lura game da wannan aikin: Dole ne in shigo da shi datetime a cikin aiki don sanya shi aiki. Ina samun kuskuren shigo da kaya a farkon fayil ɗin, wanda ya kasance mai ban mamaki. Ana wuce wannan jeri zuwa aikin RubutaToBigQuery, wanda kawai yana ƙara bayanan mu zuwa tebur. An ba da lambar don Batch DataFlow Ayuba da Yawo DataFlow Ayuba a ƙasa. Bambanci kawai tsakanin tsari da lambar yawo shine cewa a cikin tsari muna karanta CSV daga src_pathamfani da aikin ReadFromText daga Beam.

Batch DataFlow Ayuba (aiki sarrafa tsari)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Yawo DataFlow Ayuba ( sarrafa rafi)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Fara jigilar kaya

Za mu iya tafiyar da bututun ta hanyoyi daban-daban. Idan muna so, za mu iya kawai gudanar da shi a cikin gida daga tasha yayin shiga cikin GCP daga nesa.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Koyaya, za mu gudanar da shi ta amfani da DataFlow. Za mu iya yin wannan ta amfani da umarnin da ke ƙasa ta saita sigogin da ake buƙata masu zuwa.

  • project - ID na aikin GCP ɗin ku.
  • runner mai bututun mai ne wanda zai yi nazarin shirin ku kuma zai gina bututunku. Don yin aiki a cikin gajimare, dole ne ka saka DataflowRunner.
  • staging_location - hanyar zuwa Cloud Dataflow ajiya ajiyar girgije don fakitin lambobin da ake buƙata ta masu sarrafawa da ke yin aikin.
  • temp_location - hanya zuwa Cloud Dataflow ajiyar girgije don adana fayilolin aikin wucin gadi da aka ƙirƙira yayin da bututun ke gudana.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Yayin da wannan umarni ke gudana, za mu iya zuwa shafin DataFlow a cikin google console kuma duba bututunmu. Lokacin da muka danna kan bututun, ya kamata mu ga wani abu mai kama da Hoto 4. Don dalilai na gyara kuskure, zai iya zama da taimako sosai don zuwa Logs sannan zuwa Stackdriver don duba cikakkun bayanai. Wannan ya taimaka min warware matsalolin bututun mai a lokuta da dama.

Muna ƙirƙirar bututun sarrafa bayanan rafi. Kashi na 2
Hoto na 4: Mai ɗaukar katako

Shiga bayanan mu a cikin BigQuery

Don haka, ya kamata mu riga mun sami bututu mai gudana tare da bayanan da ke gudana a cikin teburinmu. Don gwada wannan, za mu iya zuwa BigQuery kuma mu duba bayanan. Bayan amfani da umarnin da ke ƙasa ya kamata ku ga ƴan layuka na farko na saitin bayanai. Yanzu da muke da bayanan da aka adana a cikin BigQuery, za mu iya yin ƙarin bincike, da kuma raba bayanan tare da abokan aiki kuma mu fara amsa tambayoyin kasuwanci.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Muna ƙirƙirar bututun sarrafa bayanan rafi. Kashi na 2
Hoto 5: BigQuery

ƙarshe

Muna fatan wannan sakon ya zama misali mai fa'ida na samar da bututun bayanai masu yawo, da kuma gano hanyoyin da za a iya samun damar bayanai. Adana bayanai a cikin wannan tsari yana ba mu fa'idodi da yawa. Yanzu za mu iya fara amsa tambayoyi masu mahimmanci kamar mutane nawa ne ke amfani da samfurin mu? Shin tushen mai amfani yana girma akan lokaci? Wadanne bangarori na samfurin ne mutane suka fi mu'amala da su? Kuma akwai kurakurai inda bai kamata ba? Waɗannan su ne tambayoyin da za su kasance da sha'awar kungiyar. Dangane da fahimtar da ke fitowa daga amsoshin waɗannan tambayoyin, za mu iya inganta samfurin da haɓaka haɗin gwiwar mai amfani.

Beam yana da matukar amfani ga irin wannan motsa jiki kuma yana da adadin wasu lokuta masu ban sha'awa na amfani kuma. Misali, ƙila za ku so yin nazarin bayanan alamar hannun jari a ainihin lokacin kuma ku yi ciniki bisa ga bincike, wataƙila kuna da bayanan firikwensin da ke fitowa daga abubuwan hawa kuma kuna son ƙididdige ƙididdigar matakin zirga-zirga. Hakanan zaka iya, alal misali, zama kamfanin caca wanda ke tattara bayanan mai amfani kuma yana amfani da shi don ƙirƙirar dashboards don bin ma'aunin maɓalli. To, maza, wannan batu ne don wani rubutu, godiya ga karantawa, kuma ga waɗanda ke son ganin cikakken lambar, a ƙasa akwai hanyar haɗi zuwa GitHub na.

https://github.com/DFoly/User_log_pipeline

Wannan duka. Karanta kashi na daya.

source: www.habr.com

Add a comment