Waxaan abuurnaa dhuumaha farsamaynta xogta durdurrada. Qaybta 2

Salaamu calaykum Waxaan wadaageynaa tarjumaadda qaybta ugu dambeysa ee maqaalka, oo si gaar ah loogu diyaariyey ardayda koorsada. Injineerka Xogta. Waxaad akhrin kartaa qaybta koowaad halkan.

Apache Beam iyo Socodka Xogta ee Tubooyinka-Waqtiga-dhabta ah

Waxaan abuurnaa dhuumaha farsamaynta xogta durdurrada. Qaybta 2

Dejinta Google Cloud

Fiiro gaar ah: Waxaan isticmaalay Google Cloud Shell si aan u socodsiiyo dhuumaha oo aan u daabaco xogta log ee gaarka ah sababtoo ah waxaa dhibaato iga haysatay socodsiinta dhuumaha Python 3. Google Cloud Shell waxay isticmaashaa Python 2, taas oo si aad ah ula socota Apache Beam.

Si loo bilaabo dhuumaha, waxaan u baahanahay inaan wax yar ku qodo goobaha. Kuwa idinka mid ah oo aan horay u isticmaalin GCP, waxaad u baahan doontaan inaad raacdo 6 tillaabo ee soo socota ee lagu qeexay tan bogga.

Taas ka dib, waxaan u baahan doonaa in aan soo rarno qoraaladayada Google Cloud Storage oo aan ku koobiyayno Google Cloud Shel. U raritaanka kaydinta daruurtu waa wax yar (sharaxaad waa la heli karaa halkan). Si aan u koobiyayno faylalkayaga, waxaan Google Cloud Shel ka furi karnaa aaladda adigoo gujinaya sumadda ugu horeysa ee bidix ee sawirka 2 hoose.

Waxaan abuurnaa dhuumaha farsamaynta xogta durdurrada. Qaybta 2
Jaantus 2

Awaamiirta aan u baahanahay si aan u koobiyayno faylalka iyo rakibidda maktabadaha loo baahan yahay ayaa ku taxan hoos.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Abuuritaanka xogtayada iyo miiska

Marka aan dhameyno dhammaan tillaabooyinka laxiriira dejinta, waxa xiga ee aan u baahanahay in aan sameyno waa in aan abuurno xog-ururin iyo miis gudaha BigQuery. Waxaa jira dhowr siyaabood oo tan loo sameeyo, laakiin tan ugu fudud ayaa ah inaad isticmaasho Google Cloud console adoo marka hore abuuraya xog-ururin. Waxaad raaci kartaa tallaabooyinka hoose linksi loo abuuro miis leh schema. Miiskeena ayaa yeelan doona 7 tiir, u dhiganta qaybaha log kasta user. Si ku habboon, waxaan ku qeexi doonaa dhammaan tiirar sida xargo, marka laga reebo doorsoomayaasha waqtiyeedka, oo aan u magacowno doorsoomayaasha aan horay u soo saarnay. Nashqadaynta shaxdayadu waa in ay u ekaataa sawirka 3.

Waxaan abuurnaa dhuumaha farsamaynta xogta durdurrada. Qaybta 2
Jaantuska 3. Qaabka shaxda

Daabacaadda xogta log isticmaalaha

Pub/Sub waa qayb muhiim ah oo ka mid ah dhuumahayaga sababtoo ah waxay u ogolaataa codsiyo badan oo madax banaan inay wada xiriiraan midba midka kale. Gaar ahaan, waxay u shaqeysaa sidii dhexdhexaadiye noo ogolaanaya inaan dirno oo aan helno farriimaha u dhexeeya codsiyada. Waxa ugu horreeya ee aan u baahanahay inaan sameyno waa abuurista mawduuc. Si fudud u gal Pub/Sub ee console-ka oo dhagsii MAWDUUCA SAMEE.

Koodhka hoose wuxuu wacayaa qoraalkayaga si uu u soo saaro xogta log ee kor lagu qeexay ka dibna isku xidho oo u diro logyada Pub/Sub. Waxa kaliya ee aan u baahanahay inaan sameyno waa abuurista shay Daabacaha Macmiilka, qeex dariiqa loo maro mawduuca adoo isticmaalaya habka topic_path oo wac shaqada publish с topic_path iyo xogta. Fadlan ogow inaan soo dejinno generate_log_line ka script our stream_logs, markaa hubi in faylashani ay ku jiraan isla galka, haddii kale waxaad heli doontaa cilad soo dejinta. Waxaan markaa ku socodsiin karnaa tan google console-ka annaga oo adeegsanayna:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Isla marka uu faylku socdo, waxaan awood u yeelan doonaa inaan aragno wax soo saarka xogta log ee konsole, sida ka muuqata shaxanka hoose. Qoraalkani wuu shaqayn doonaa ilaa inta aynaan isticmaalin CTRL + Csi loo dhamaystiro.

Waxaan abuurnaa dhuumaha farsamaynta xogta durdurrada. Qaybta 2
Jaantuska 4. Wax-soo-saarka publish_logs.py

Qorista koodka dhuumahayaga

Hadda oo aan haysano wax walba oo la diyaariyey, waxaan bilaabi karnaa qaybta madadaalada - codaynta dhuumahayaga adoo isticmaalaya Beam iyo Python. Si loo abuuro dhuumaha Beam, waxaan u baahanahay inaan abuurno shay dhuumo (p). Marka aan abuurno shay dhuumo ah, waxaan ku dabaqi karnaa hawlo badan midba midka kale adoo isticmaalaya hawlwadeenka pipe (|). Guud ahaan, socodka shaqadu wuxuu u eg yahay sawirka hoose.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Koodhkayaga, waxaanu abuuri doonaa laba hawlood oo khaas ah. Shaqada regex_clean, kaas oo baara xogta oo soo celiya safka u dhigma ee ku salaysan liiska PATTERNS adoo isticmaalaya shaqada re.search. Shaqadu waxay soo celinaysaa xargo kala go'ay. Haddii aadan ahayn khabiir odhaah joogto ah, waxaan ku talinayaa inaad tan hubiso waxbarashada oo ku tababar suufka qoraalka si aad u hubiso koodka. Taas ka dib waxaan qeexnaa shaqada ParDo ee caadada u ah oo la yiraahdo Split, Kaas oo ah kala duwanaanshiyaha Beam-ka ee isbarbar-dhigga. Python dhexdeeda, tan waxaa loo sameeyaa si gaar ah - waa inaan abuurnaa fasal ka dhaxla fasalka DoFn Beam. Hawsha Kala-baxa waxay ka soo qaadaa safkii la miisaamay ee hawshii hore oo waxay soo celisaa liis qaamuusyo leh furayaal u dhigma magacyada tiirarka miiskayaga BigQuery. Waxaa jira wax la xuso oo ku saabsan shaqadan: Waa inaan soo dejiyo datetime gudaha shaqada si ay u shaqeyso. Waxa aan helayay cilad soo dejineed bilawgii faylka, kaas oo ahaa yaab. Liiskan ayaa markaa loo gudbiyaa shaqada QorToBigQuery, kaas oo si fudud xogtayada ku daraya miiska. Koodhka Dufcaddii DataFlow Job iyo Streaming DataFlow Job ayaa hoos lagu siiyey. Farqiga kaliya ee u dhexeeya dufcaddii iyo koodka qulqulka ayaa ah in dufcaddii aan ka akhrino CSV-ga src_pathiyadoo la isticmaalayo shaqada ReadFromText ka Beam.

Dufcaddii Xogta Socodka Shaqada (habaynta Dufcadaha)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Socodka Xogta Socodka Shaqada (habaynta socodka)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Bilaabida gaadiidka qaada

Waxaan u socodsiin karnaa dhuumaha dhowr siyaabood oo kala duwan. Haddii aan rabno, waxaan kaliya ka maamuli karnaa gudaha terminaalka inta aan ka galeyno GCP meel fog.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Si kastaba ha ahaatee, waxaanu ku socodsiin doonaa anagoo adeegsanayna DataFlow. Waxaan ku sameyn karnaa tan anagoo adeegsanayna amarka hoose anagoo dejineynaa xuduudaha loo baahan yahay ee soo socda.

  • project - Aqoonsiga mashruucaaga GCP.
  • runner waa tuubo-socod kaas oo falanqayn doona barnaamijkaaga oo dhisi doona dhuumahaaga. Si aad ugu dhex socoto daruuraha, waa inaad sheegtaa DataflowRunner.
  • staging_location - Waddada loo maro kaydinta Cloud Dataflow Cloud ee tusmaynta xidhmooyinka koodka ee ay u baahan yihiin soo-saareyaasha shaqada qabanaya.
  • temp_location - Waddada loo maro kaydinta daruuraha xogta qulqulka daruuraha si loogu kaydiyo faylalka shaqada ee ku meel gaadhka ah ee la abuuray iyada oo dhuumaha ay socdaan.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Inta uu amarkani socdo, waxaan aadi karnaa tab DataFlow ee konsolka google-ka oo aan aragno dhuumahayaga. Marka aan gujino dhuumaha, waa inaan aragnaa wax la mid ah Jaantuska 4. Ujeedooyinka deminta, waxay noqon kartaa mid aad waxtar u leh inaad tagto Logs ka dibna Stackdriver si aad u aragto diiwaannada faahfaahsan. Tani waxay iga caawisay inaan xalliyo arrimaha dhuumaha dhawr kiis.

Waxaan abuurnaa dhuumaha farsamaynta xogta durdurrada. Qaybta 2
Jaantuska 4: Gaadhi-beereyaasha

Ka gal xogtayada BigQuery

Markaa, waa in aan hore u haysanaa dhuumo socda oo xogtu miiskayaga ku socoto. Si loo tijaabiyo tan, waxaan aadi karnaa BigQuery oo aan eegno xogta. Ka dib markaad isticmaasho amarka hoose waa inaad aragto dhowrka saf ee ugu horreeya ee xogta. Hadda oo aan hayno xogta ku kaydsan BigQuery, waxaan samayn karnaa falanqayn dheeraad ah, sidoo kale waxaan la wadaagi karnaa xogta asxaabta oo aan bilowno ka jawaabista su'aalaha ganacsiga.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Waxaan abuurnaa dhuumaha farsamaynta xogta durdurrada. Qaybta 2
Jaantuska 5: BigQuery

gunaanad

Waxaan rajeyneynaa in boostadani ay u adeegto tusaale waxtar leh oo abuurista dhuumaha xogta qulqulka, iyo sidoo kale helitaanka siyaabo aad xogta uga dhigi karto mid la heli karo. Kaydinta xogta qaabkan waxay ina siinaysaa faa'iidooyin badan. Hadda waxaan bilaabi karnaa ka jawaabista su'aalaha muhiimka ah sida dad intee le'eg ayaa isticmaala alaabtayada? Saldhigga isticmaalahaagu ma korayaa muddo ka dib? Waa maxay dhinacyada alaabta ay dadku la falgalaan inta ugu badan? Qaladaadse ma jiraan meel aanay ahayn inay ka jiraan? Waa su'aalaha ururka xiisayn doona. Iyada oo ku saleysan fikradaha ka soo baxa jawaabaha su'aalahan, waxaan wanaajin karnaa alaabta oo aan kordhin karnaa ka qaybgalka isticmaalaha.

Beam runtii waa mid faa'iido u leh jimicsiga noocan ah wuxuuna leeyahay tiro ka mid ah kiisaska isticmaalka kale ee xiisaha leh sidoo kale. Tusaale ahaan, waxaa laga yaabaa inaad rabto inaad falanqeyso xogta saxmada saamiyada wakhtiga dhabta ah oo aad sameyso ganacsiyo ku saleysan falanqaynta, laga yaabee inaad haysato xogta dareemayaasha ee ka imanaya baabuurta oo aad rabto inaad xisaabiso xisaabinta heerka taraafikada. Waxaad sidoo kale, tusaale ahaan, noqon kartaa shirkad ciyaaraha ururisa xogta isticmaalaha oo u adeegsata inay abuurto dashboards si ay ula socoto cabbirada muhiimka ah. Hagaag, marwooyin, kani waa mawduuc qoraal kale, waad ku mahadsan tahay akhrinta, iyo kuwa doonaya inay arkaan koodka buuxa, hoos waxaa ah xidhiidhka GitHub.

https://github.com/DFoly/User_log_pipeline

Taasi waa intaas oo dhan. Akhriso qaybta koowaad.

Source: www.habr.com

Add a comment