Nos rivum notitia processus pipeline creamus. Pars I

Hi omnes. Nos translationem ultimae partis articuli communicamus, quae specialiter studentibus curriculi praeparata est. Data Engineer. Primam partem legere potes hic.

Apache Beam et DataFlow ad Verus-Tempus Pipelines

Nos rivum notitia processus pipeline creamus. Pars I

Profecta Google Cloud

Nota: Google Cloud Testa usus sum ad pipelineum currendum et ad consuetum stipes notitias publicandas quod in Pythone fistulam currendo laborarem 3. Google Cloud Testa Pythone 2 utitur, quod apache Tigno constantius est.

Pipeline incipere, opus est paululum in uncinis fodere. Eorum enim vestrum qui ante GCP non usi sunt, necesse est ut sequentes 6 gradus in hac delineata sequi debebis page.

Post haec, scripta nostra ad Google Cloud Repono et ad Google Cloud Shel transcribere necesse est. Discas nubem repono est admodum leve (descriptio inveniri potest hic). Ad tabellas nostras imitandas, possumus Google Cloud Shel aperire e instrumento e strepitando iconem primam in sinistra in Figura 2 infra.

Nos rivum notitia processus pipeline creamus. Pars I
figure 2

Mandata nobis necessaria sunt ut tabellas effingas et instituas requisita bibliothecarum quae infra recensentur.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Creando nostrum database et mensam

Cum perfecerimus omnes gradus iunctis setup, proximus nobis opus est creare dataset ac mensa in BigQuery. Plures modi ad hoc faciunt, sed simplicissimus est Google Cloud consolatorium utendi per primam dataset creando. Sequi potes gradus infra Linkschema ad mensam cum creare. Nostra mensa erit 7 columnascomponentibus singulis usorum usorum respondentibus. Pro commodo, omnes columnas chordas definiemus, praeter temporis variabiles, easque secundum variabiles antea generatas nominabimus. Mensae nostrae assatio tamquam in Figura III spectare debet.

Nos rivum notitia processus pipeline creamus. Pars I
Figure 3. Tabula layout

Iniuriarum notitia publici usoris

Pub/Sub pars critica nostri pipelini est quia plures applicationes independentes inter se communicare sinit. Peculiariter, intermedium operatur, qui sinit nuntios inter applicationes mittere ac recipere. Primum, quod opus est facere, est thema creare. Simpliciter ire ad Pub/Sub in console et deprime ARGUMENTUM CREATE.

Codex infra scriptos nostros vocat ad datam stipem supra definitam generandam ac deinde connectit ac mittit ad Pub/Sub. Sola res nobis opus est facere objectum creare PublisherClient, specificare viam ad thema utendi modum topic_path et vocant munus publish с topic_path et data. Lorem quod importamus generate_log_line ex nostris scriptor stream_logsfac ergo haec fascicula in eodem folder, alioquin errorem importare accipies. Hoc ergo currere possumus per consolatorium nostrum google utens:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Quamprimum tabella decurrit, videre poterimus in notitia stipes output ad consolatorium, ut in figura infra ostendetur. Hoc scriptum erit opus quod modo non utimur CTRL + Cad complendum.

Nos rivum notitia processus pipeline creamus. Pars I
Figure 4. Output publish_logs.py

Nostrum pipeline scribebat codice

Nunc ut omnia parata habeamus, partem iocum - coding nostro pipeline utentes Tigno et Pythone incipiemus. Ut trabem pipeline crearet, pipelinem creare opus est (p). Postquam obiectum pipelinum creavimus, multiplices functiones unum post alium uti operatorem adhibere possumus pipe (|). In genere, tincidunt imago infra spectat.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

In nostro codice duo consuetudines functionum creabimus. Officium regex_clean, qui notitias perlustrat et invenit ordinem respondentem secundum PATTERNS indicem utens munere re.search. Munus refert comma separatum chorda. Si regularis expressionis peritus non es, hoc annotando commendo doceo et usu in notula ad codicem reprimendum. Post hoc definimus consuetudinem ParDo munus vocatum Splitquae est variatio Trabi pro processu parallelo transformato. In Pythone, hoc speciali modo fit - creare debemus genus quod a DoFn Beam genus possidet. Munus Spalatum accipit ordinem parsed a functione praecedente et redit indicem dictionariorum cum clavibus ad columnam nominum in tabula nostra BigQuery respondentem. Est aliquid notandum de hoc munere: mihi importat datetime intra munus facere opus est. Errorem importans in initio tabellae nanctus sum, quod fatum erat. Hoc index tum ad munus transiit WriteToBigQueryquae simpliciter addit ad mensam nostram datam. Codex Batch DataFlow Iob et Streaming DataFlow Iob infra datur. Sola differentia inter batch et codicem exundans est, quod in batch legitur CSV a src_pathper munus ReadFromText de Tigno.

Batch DataFlow Job (batch processus)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

DataFlow Job streaming (amnis processus)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Satus TRADUCTOR

Pipeline pluribus modis currere possumus. Si vellemus, modo a termino in GCP colligationem in longinquo localiter currere possemus.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Sed imus ad utens DataFlow currere. Hoc facere possumus utendo imperio inferiorem ponendo sequentes ambitum requisitum.

  • project — ID propositi tui GCP.
  • runner cursor pipeline est qui propositum tuum resolvere ac fistulam tuam construere. In nube currere, DataflowRunner notare debes.
  • staging_location - iter ad nubem Dataflow nubis repositam ad indicendum codicem fasciculis necessariis processoribus operi faciendo.
  • temp_location - iter ad nubem Dataflow nubes reposita ad condendum officium temporale lima creata dum pipelineum currit.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Dum hoc mandatum currit, ad DataFlow tab in google consolatorium ire possumus et fistulam nostram spectare. Cum strepita in pipelino, aliquid simile figurae videre debemus 4. Ad debugging usus, perquam utile est ire ad Logs et deinde ad Stackdriver ad singula ligna explorare. Hoc me adiuvit quaestiones pipelines componendas in pluribus casibus.

Nos rivum notitia processus pipeline creamus. Pars I
Figura IV: Trabem TRADUCTOR

Accessere nostra notitia in BigQuery

Sic iam habere deberemus pipelineum currentem cum notitia in nostram mensam fluentem. Hoc probare, BigQuery adire possumus et notitias intueri. Cum imperio uti infra paucos primos ordines dataset videre debes. Nunc quod notitia in BigQuery condita habemus, ulteriorem analysin ducere possumus, ac notitia communicare cum collegis et quaestionibus negotiis respondere committitur.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Nos rivum notitia processus pipeline creamus. Pars I
Figure 5: BigQuery

conclusio,

Speramus haec epistula exemplum utilem creandi datas fistulas profusas, ac vias inveniendi ad notitias magis pervias faciendas. In hac forma repono notitias multa nobis tribuit utilitates. Nunc incipimus respondere quaestionibus magnis quales quot homines utuntur nostra producta? Crescente basi user tuus in tempore? Quid facies productum quod homines inter se occurrunt cum maxime? Et sunt errores ubi non debet esse? Hae sunt interrogationes quae ad ordinationem commodae erunt. Ex perceptis quae ex responsionibus ad has quaestiones emergunt, nos meliorem fieri et augere pugnae usuario possumus.

Trabs vere utilis est ad hoc genus exercitationis et plures alias casus commodas usui habet. Exempli gratia, notitias stirpis ricinum in tempore reali resolvere vis, et artificia facere in analysi fundata, fortasse sensorem datam e vehiculis venientem habes et computare vis negotiationis gradus calculi. Potuisti etiam, exempli gratia, esse turmam lusoriam quae notitias usorum colligit et utitur eo ad ashboardday creare ad clavem metricam indagare. Bene, iudices, haec est thema alterius postis, gratiae lectionis, et iis qui plenam codicem videre cupiunt, infra nexus cum GitHub meo est.

https://github.com/DFoly/User_log_pipeline

Quod suus omnes. Read partem unum.

Source: www.habr.com

Add a comment