Mamorona fantsona fanodinana angon-drakitra izahay. Fizarana 2

Salama daholo. Mizara ny fandikana ny tapany farany amin'ny lahatsoratra izahay, izay nomanina manokana ho an'ny mpianatry ny taranja. Data Engineer. Azonao atao ny mamaky ny ampahany voalohany eto.

Apache Beam sy DataFlow ho an'ny fantsona tena fotoana

Mamorona fantsona fanodinana angon-drakitra izahay. Fizarana 2

Fametrahana Google Cloud

Fanamarihana: Nampiasa Google Cloud Shell aho mba hampandehanana ny fantsona ary hamoaka ny angon-drakitra manokana satria sahirana amin'ny fampandehanana ny fantsona amin'ny Python 3. Google Cloud Shell dia mampiasa Python 2, izay mifanaraka kokoa amin'ny Apache Beam.

Mba hanombohana ny fantsona dia mila mihady kely amin'ny toe-javatra isika. Ho anao izay mbola tsy nampiasa GCP teo aloha, dia mila manaraka ireto dingana 6 manaraka ireto ianao pejy.

Aorian'izany dia mila mampakatra ny scripty any amin'ny Google Cloud Storage izahay ary mandika azy ireo any amin'ny Google Cloud Shel. Ny fampiakarana amin'ny fitahirizana rahona dia tsy dia misy dikany loatra (misy famaritana eto). Raha handika ny rakitray, dia afaka manokatra Google Cloud Shel avy amin'ny fitaovana izahay amin'ny fipihana ny kisary voalohany eo ankavia amin'ny sary 2 etsy ambany.

Mamorona fantsona fanodinana angon-drakitra izahay. Fizarana 2
2 Figure

Ny baiko ilaintsika handikana ireo rakitra sy fametrahana ireo tranomboky ilaina dia voatanisa etsy ambany.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Mamorona ny angonay sy ny latabatra

Rehefa vita ny dingana rehetra mifandraika amin'ny fananganana, ny zavatra manaraka tokony hataontsika dia ny mamorona angona sy latabatra ao amin'ny BigQuery. Misy fomba maro hanaovana izany, fa ny tsotra indrindra dia ny fampiasana ny Google Cloud Console amin'ny famoronana daty voalohany. Azonao atao ny manaraka ny dingana etsy ambany rohyhamorona latabatra misy skema. Hisy ny latabatray 7 andry, mifanandrify amin'ireo singa ao amin'ny diarin'ny mpampiasa tsirairay. Ho fanamorana dia hofaritantsika ho tady avokoa ny tsanganana rehetra, afa-tsy ny fari-potoana eo an-toerana, ary omena anarana araka ny fari-piainana novokarinay teo aloha. Ny fisehon'ny latabatray dia tokony hitovy amin'ny sary 3.

Mamorona fantsona fanodinana angon-drakitra izahay. Fizarana 2
Sary 3. Fandrafetana latabatra

Famoahana ny angon'ny diarin'ny mpampiasa

Ny Pub/Sub dia singa manan-danja amin'ny fantsonay satria mamela fampiharana tsy miankina maro hifampiresaka. Indrindra indrindra, miasa toy ny mpanelanelana ahafahantsika mandefa sy mandray hafatra eo anelanelan'ny fampiharana. Ny zavatra voalohany tokony hataontsika dia ny mamorona lohahevitra iray. Mandehana fotsiny amin'ny Pub/Sub ao amin'ny console ary tsindrio CREATE TOPIC.

Ny kaody etsy ambany dia miantso ny scripty mba hamoronana ny angon-drakitra voalaza etsy ambony ary avy eo mampifandray sy mandefa ny logs amin'ny Pub/Sub. Ny hany tokony hataontsika dia ny mamorona zavatra iray PublisherClient, mamaritra ny lalana mankany amin'ny lohahevitra mampiasa ny fomba topic_path ary miantso ny asa publish с topic_path ary data. Marihina fa manafatra izahay generate_log_line avy amin'ny scripty stream_logs, koa ataovy azo antoka fa ao amin'ny lahatahiry iray ihany ireo rakitra ireo, raha tsy izany dia hahazo hadisoana fanafarana ianao. Azontsika atao izany amin'ny alàlan'ny console google amin'ny fampiasana:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Raha vao mandeha ny rakitra dia ho hitantsika ny fivoahan'ny angon-drakitra mankany amin'ny console, araka ny aseho amin'ny sary etsy ambany. Ity script ity dia hiasa raha mbola tsy ampiasainay Ctrl + Chamita azy.

Mamorona fantsona fanodinana angon-drakitra izahay. Fizarana 2
Sary 4. Output publish_logs.py

Manoratra ny kaodin-tsofinay

Efa voaomanantsika izao ny zava-drehetra, afaka manomboka ny ampahany mahafinaritra isika - fametahana ny fantsona amin'ny alΓ lan'ny Beam sy Python. Mba hamoronana fantsona Beam dia mila mamorona zavatra fantsona (p). Raha vao namorona zavatra pipeline isika, dia afaka mampihatra asa maro isan-karazany amin'ny fampiasana ny operator pipe (|). Amin'ny ankapobeny dia mitovy amin'ny sary etsy ambany ny fizotran'ny asa.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Amin'ny kaody ataontsika dia hamorona fiasa roa mahazatra isika. asa regex_clean, izay mijery ny angon-drakitra ary maka ny laharana mifanaraka amin'izany mifototra amin'ny lisitry ny PATTERNS mampiasa ny fiasa re.search. Ny fiasa dia mamerina tady misaraka faingo. Raha tsy manam-pahaizana manokana momba ny fanehoan-kevitra mahazatra ianao dia manoro hevitra aho hanamarina ity fampianarana ary manao fanazaran-tena amin'ny notepad mba hanamarinana ny kaody. Aorian'izany dia mamaritra ny fiasa ParDo mahazatra antsoina hoe Saraho, izay fiovaovan'ny fiovan'ny Beam ho an'ny fanodinana parallèle. Amin'ny Python dia atao amin'ny fomba manokana izany - tsy maintsy mamorona kilasy mandova avy amin'ny kilasy DoFn Beam isika. Ny fiasan'ny Split dia maka ny andalana voasakana avy amin'ny fiasa teo aloha ary mamerina ny lisitry ny rakibolana misy fanalahidy mifanaraka amin'ny anaran'ny tsanganana ao amin'ny latabatra BigQuery. Misy zavatra tokony homarihina momba ity fiasa ity: tsy maintsy nanafatra aho datetime ao anatin'ny fiasa iray mba hampandeha azy. Nahazo fahadisoana fanafarana aho tany am-piandohan'ny rakitra, izay hafahafa. Ity lisitra ity dia alefa any amin'ny asa WriteToBigQuery, izay manampy fotsiny ny angonay amin'ny latabatra. Ny kaody ho an'ny Batch DataFlow Job sy ny Streaming DataFlow Job dia omena etsy ambany. Ny hany maha samy hafa ny batch sy ny streaming code dia ny batch mamaky ny CSV avy src_pathmampiasa ny asa ReadFromText avy amin'ny Beam.

Batch DataFlow Job (fanodinana andiany)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (fikarakarana ny stream)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Manomboka ny conveyor

Afaka mampandeha ny pipeline amin'ny fomba maro samihafa isika. Raha tiana dia azonay atao ny mampandeha azy eo an-toerana avy amin'ny terminal iray rehefa miditra amin'ny GCP lavitra.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Na izany aza, hampiasa izany amin'ny DataFlow izahay. Azontsika atao izany amin'ny alΓ lan'ny baiko etsy ambany amin'ny alΓ lan'ny fametrahana ireto fepetra takiana manaraka ireto.

  • project - ID ny tetikasa GCP-nao.
  • runner dia mpihazakazaka fantsona izay hamakafaka ny programanao sy hanorina ny fantsonao. Raha te hihazakazaka amin'ny rahona ianao dia tsy maintsy mamaritra DataflowRunner.
  • staging_location β€” ny lalana mankany amin'ny fitehirizana rahona Cloud Dataflow ho an'ny fonosana kaody fanondroana ilain'ireo mpanodina manao ny asa.
  • temp_location - lalana mankany amin'ny fitehirizana rahona Cloud Dataflow ho fitehirizana ireo rakitra asa vonjimaika noforonina rehefa mandeha ny fantsona.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Raha mandeha io baiko io dia afaka mandeha any amin'ny tabilao DataFlow ao amin'ny console google isika ary mijery ny fantsonay. Rehefa manindry ny fantsona isika, dia tokony hahita zavatra mitovy amin'ny sary 4. Ho an'ny tanjona debug, dia tena manampy tokoa ny mandeha any amin'ny Logs ary avy eo mankany amin'ny Stackdriver raha hijery ireo logs amin'ny antsipiriany. Izany dia nanampy ahy hamaha ny olan'ny fantsona amin'ny tranga maromaro.

Mamorona fantsona fanodinana angon-drakitra izahay. Fizarana 2
Sary 4: Beam conveyor

Midira ao amin'ny BigQuery ny angonay

Noho izany, tokony efa manana fantsona mandeha miaraka amin'ny angona mikoriana ao amin'ny latabatray isika. Mba hitsapana izany dia afaka mandeha any amin'ny BigQuery isika ary mijery ny angona. Rehefa avy mampiasa ny baiko etsy ambany ianao dia tokony hahita andalana vitsivitsy voalohany amin'ny angona. Rehefa manana ny angona voatahiry ao amin'ny BigQuery izahay izao, dia afaka manao famakafakana bebe kokoa, ary mizara ny angona amin'ny mpiara-miasa ary manomboka mamaly fanontaniana momba ny raharaham-barotra.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Mamorona fantsona fanodinana angon-drakitra izahay. Fizarana 2
Sary 5: BigQuery

famaranana

Manantena izahay fa ity lahatsoratra ity dia ho ohatra mahasoa amin'ny famoronana fantsona angon-drakitra mivantana, ary koa fitadiavana fomba hahatonga ny angona ho mora idirana kokoa. Ny fitehirizana angona amin'ity endrika ity dia manome tombony maro ho antsika. Ankehitriny isika dia afaka manomboka mamaly fanontaniana manan-danja toy ny firy ny olona mampiasa ny vokatra? Mitombo ve ny mpampiasa anao rehefa mandeha ny fotoana? Inona avy ireo lafiny amin'ny vokatra ifandrimbonan'ny olona indrindra? Ary misy lesoka tsy tokony hisy ve? Ireo no fanontaniana hahaliana ny fikambanana. Miorina amin'ny fomba fijery mipoitra avy amin'ny valin'ireo fanontaniana ireo, afaka manatsara ny vokatra isika ary mampitombo ny fidiran'ny mpampiasa.

Beam dia tena ilaina amin'ity karazana fanazaran-tena ity ary manana tranga fampiasana mahaliana hafa koa. Ohatra, azonao atao ny manadihady ny angon-drakitra momba ny tahiry amin'ny fotoana tena izy ary manao varotra mifototra amin'ny famakafakana, angamba manana angona sensor avy amin'ny fiara ianao ary te hanao kajy ny haavon'ny fifamoivoizana. Azonao atao ihany koa, ohatra, ny ho orinasa lalao manangona angon-drakitra mpampiasa ary mampiasa azy io hamoronana dashboard hanaraha-maso ireo metrika fototra. Okay, tompokolahy, lohahevitra ho an'ny lahatsoratra hafa ity, misaotra amin'ny famakiana, ary ho an'ireo izay te-hahita ny kaody feno, eto ambany ny rohy mankany amin'ny GitHub-ko.

https://github.com/DFoly/User_log_pipeline

Izay ihany. Vakio ny ampahany voalohany.

Source: www.habr.com

Add a comment