Senza umbhobho wokucubungula idatha. Icandelo lesi-2

Molweni nonke. Sabelana ngokuguqulelwa kwenxalenye yokugqibela yenqaku, elungiselelwe ngokukodwa abafundi bekhosi. Injineli yedatha. Unako ukufunda inxalenye yokuqala apha.

I-Apache Beam kunye neDataFlow yeePipelines zeXesha langempela

Senza umbhobho wokucubungula idatha. Icandelo lesi-2

Ukumisela iLifu likaGoogle

Qaphela: Ndisebenzise i-Google Cloud Shell ukuqhuba umbhobho kunye nokupapasha idatha yelog yesiko kuba bendinengxaki yokusebenzisa umbhobho kwiPython 3. I-Google Cloud Shell isebenzisa iPython 2, ehambelana ngakumbi ne-Apache Beam.

Ukuqala umbhobho, kufuneka simbe kancinci kwizicwangciso. Kwabo kuni abangazange bayisebenzise i-GCP ngaphambili, kuya kufuneka ukuba nilandele la manyathelo ama-6 alandelayo achazwe apha iphepha.

Emva koku, kuya kufuneka silayishe izikripthi zethu kuGcino lweLifu likaGoogle kwaye sizikhuphele kwiShel yethu yeLifu likaGoogle. Ukulayisha kwindawo yokugcina ilifu kuyinto encinci kakhulu (inkcazo inokufumaneka apha). Ukukopa iifayile zethu, sinokuvula i-Google Cloud Shel kwibar yesixhobo ngokucofa i icon yokuqala ngasekhohlo kuMfanekiso 2 ongezantsi.

Senza umbhobho wokucubungula idatha. Icandelo lesi-2
Umzobo we-2

Imiyalelo esiyidingayo ukukopa iifayile kwaye sifake iilayibrari ezifunekayo zidweliswe ngezantsi.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Ukudala idatabase yethu kunye netafile

Nje ukuba sigqibe onke amanyathelo anxulumene nokuseta, into elandelayo ekufuneka siyenzile kukwenza iseti yedatha kunye netafile kwiBigQuery. Kukho iindlela ezininzi zokwenza oku, kodwa eyona ilula kukusebenzisa ikhonsoli yeLifu likaGoogle ngokudala isethi yedatha. Ungalandela la manyathelo angezantsi unxibelelwanoukwenza itafile nge schema. Itafile yethu iya kuba nayo 7 ikholamu, ehambelana namacandelo elog yomsebenzisi ngamnye. Ukuze kube lula, siya kuchaza zonke iikholomu njengeentambo, ngaphandle kokuguquguquka kwexesha, kwaye sibize ngamagama ngokweenguqu esizenzileyo ngaphambili. Uyilo lwetafile yethu kufuneka lubukeke kumfanekiso wesi-3.

Senza umbhobho wokucubungula idatha. Icandelo lesi-2
Umzobo 3. Uyilo lwetheyibhile

Ukupapasha idatha yelog yomsebenzisi

I-Pub/Sub yinxalenye ebalulekileyo yombhobho wethu kuba ivumela izicelo ezininzi ezizimeleyo ukuba zinxibelelane. Ngokukodwa, isebenza njengommeli osivumela ukuba sithumele kwaye sifumane imiyalezo phakathi kwezicelo. Into yokuqala ekufuneka siyenzile kukwenza isihloko. Yiya ngokulula kwiPub/Sub kwiconsole kwaye ucofe YENZA ISIHLOKO.

Ikhowudi engezantsi ibiza iskripthi sethu ukuvelisa idatha yelog echazwe ngasentla kwaye idibanise kwaye ithumele iilog kwiPub/Sub. Inye kuphela into ekufuneka siyenzile kukudala into UmpapashiUmthengi, khankanya umendo wesihloko usebenzisa indlela topic_path kwaye ubize umsebenzi publish с topic_path kunye nedatha. Nceda uqaphele ukuba singenisa ngaphandle generate_log_line kwiscript sethu stream_logs, ke qiniseka ukuba ezi fayile zikwincwadi enye, kungenjalo uya kufumana imposiso yokungenisa. Emva koko sinokuqhuba oku nge-google console yethu sisebenzisa:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Ngokukhawuleza xa ifayile isebenza, siya kukwazi ukubona umphumo wedatha yelogi kwi-console, njengoko kuboniswe kumzobo ongezantsi. Esi script siya kusebenza okoko nje singasebenzisi I-CTRL + Cukuyigqiba.

Senza umbhobho wokucubungula idatha. Icandelo lesi-2
Umzobo 4. Isiphumo publish_logs.py

Ukubhala ikhowudi yethu yombhobho

Ngoku sele silungiselele yonke into, sinokuqala icandelo elimnandi-ukubhala ikhowudi yombhobho wethu usebenzisa iBeam kunye nePython. Ukudala umbhobho weBeam, kufuneka senze into yombhobho (p). Nje ukuba senze into yombhobho, sinokufaka imisebenzi emininzi omnye emva komnye usebenzisa umqhubi pipe (|). Ngokubanzi, ukuhamba komsebenzi kubonakala njengomfanekiso ongezantsi.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Kwikhowudi yethu, siya kudala imisebenzi emibini yesiko. Umsebenzi regex_clean, ehlola idatha kwaye ifumana umqolo ohambelanayo ngokusekelwe kuluhlu lwePATTERNS usebenzisa umsebenzi re.search. Umsebenzi ubuyisela umtya owahluliweyo wekoma. Ukuba awuyiyo ingcali yokubonakalisa rhoqo, ndincoma ukuba ujonge oku isifundo kwaye uziqhelanise nephedi ukujonga ikhowudi. Emva koku sichaza umsebenzi weParDo oqhelekileyo obizwa ngokuba Umehlulelwano, oluluguquko lweBeam yokuguqula ukusetyenzwa ngokuhambelanayo. KwiPython, oku kwenziwa ngendlela ekhethekileyo - kufuneka senze iklasi efumana ilifa kwiklasi ye-DoFn Beam. Umsebenzi wokwahlula uthatha umqolo ocazululiweyo kumsebenzi wangaphambili kwaye ubuyisela uluhlu lwezichazi-magama nezitshixo ezihambelana namagama ekholamu kwitheyibhile yethu yeBigQuery. Kukho into ekufuneka uyiqaphele ngalo msebenzi: Kwafuneka ndingenise ngaphandle datetime ngaphakathi komsebenzi ukuwenza usebenze. Bendifumana impazamo yokungenisa ekuqaleni kwefayile, ebingaqhelekanga. Olu luhlu lugqithiselwe kumsebenzi BhalaToBigQuery, eyongeza ngokulula idatha yethu kwitafile. Ikhowudi ye-Batch DataFlow Job kunye ne-Streaming DataFlow Job inikwe ngezantsi. Umahluko kuphela phakathi kwebhetshi kunye nekhowudi yokusasaza kukuba kwibhetshi sifunda iCSV ukusuka src_pathusebenzisa umsebenzi ReadFromText ukusuka kwiBeam.

Umsebenzi weBatch wokuHamba kweDatha (ukusetyenzwa kwebhetshi)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Umsebenzi wokuHamba kweDatha (ukusetyenzwa komsinga)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Ukuqalisa umhambisi

Sinokuqhuba umbhobho ngeendlela ezininzi ezahlukeneyo. Ukuba siyafuna, sinokuyiqhuba nje ekuhlaleni sisuka kwi-terminal ngokungena kwi-GCP ukude.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Nangona kunjalo, siza kuyiqhuba sisebenzisa iDataFlow. Singakwenza oku sisebenzisa lo myalelo ungezantsi ngokumisela le parameters ilandelayo.

  • project β€” Isazisi seprojekthi yakho ye-GCP.
  • runner yimbaleki yombhobho eya kuhlalutya inkqubo yakho kwaye yakhe umbhobho wakho. Ukusebenzisa kwilifu, kufuneka uchaze i-DataflowRunner.
  • staging_location - indlela eya kwi-Cloud Dataflow yokugcina ifu ye-indexing code packages ezifunekayo ngabaprosesa abenza umsebenzi.
  • temp_location - indlela eya kwi-Cloud Dataflow yokugcina ifu yokugcina iifayile zexeshana ezenziweyo ngelixa umbhobho usebenza.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Ngelixa lo myalelo usebenza, sinokuya kwi-DataFlow ithebhu kwi-google console kwaye sijonge umbhobho wethu. Xa sicofa kumbhobho, kufuneka sibone into efana ne-Figure 4. Ngeenjongo zokulungisa iimpazamo, kunokuba luncedo kakhulu ukuya kwiiLogs kwaye emva koko kwi-Stackdriver ukujonga iilogi ezineenkcukacha. Oku kuye kwandinceda ukusombulula iingxaki zemibhobho kwiimeko ezininzi.

Senza umbhobho wokucubungula idatha. Icandelo lesi-2
Umzobo 4: Umhambisi we-beam

Fikelela kwidatha yethu kwiBigQuery

Ke, kufanele ukuba sele sinombhobho osebenza ngedatha equkuqelayo kwitafile yethu. Ukuvavanya oku, sinokuya kwi-BigQuery kwaye sijonge idatha. Emva kokusebenzisa lo myalelo ungezantsi kufuneka ubone imiqolo embalwa yokuqala yedatha. Ngoku ukuba sinedatha egcinwe kwi-BigQuery, sinokuqhuba uhlalutyo olongezelelweyo, kunye nokwabelana ngedatha kunye noogxa bethu kwaye siqale ukuphendula imibuzo yezoshishino.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Senza umbhobho wokucubungula idatha. Icandelo lesi-2
Umzobo 5: BigQuery

isiphelo

Siyathemba ukuba esi sithuba sisebenza njengomzekelo oluncedo wokudala umbhobho wedatha yokusakaza, kunye nokufumana iindlela zokwenza idatha ifikeleleke ngakumbi. Ukugcina idatha kule fomati kusinika iingenelo ezininzi. Ngoku sinokuqala ukuphendula imibuzo ebalulekileyo efana nokuba bangaphi abantu abasebenzisa imveliso yethu? Ngaba isiseko sakho somsebenzisi siyakhula ngokuhamba kwexesha? Yeyiphi imiba yemveliso abantu abadibana nayo kakhulu? Kwaye ngaba kukho iimpazamo apho kungafanele kubekho? Le yimibuzo eya kuba nomdla kwintlangano. Ngokusekelwe kwiimbono ezivela kwiimpendulo zale mibuzo, sinokuyiphucula imveliso kunye nokwandisa ukusebenzisana komsebenzisi.

I-Beam iluncedo ngokwenene kolu hlobo lokuzilolonga kwaye ineqela lezinye iimeko ezinomdla zokusebenzisa ngokunjalo. Ngokomzekelo, unokufuna ukuhlalutya idatha ye-stock tick ngexesha langempela kwaye wenze urhwebo ngokusekelwe kuhlalutyo, mhlawumbi unedatha ye-sensor evela kwizithuthi kwaye ufuna ukubala izibalo zezinga le-traffic. Unokuthi, umzekelo, ube yinkampani yokudlala eqokelela idatha yomsebenzisi kwaye iyisebenzise ukwenza iideshibhodi zokulandelela iimetriki eziphambili. Kulungile, manene, esi sihloko sesinye isithuba, enkosi ngokufunda, kunye nakwabo bafuna ukubona ikhowudi epheleleyo, ngezantsi ikhonkco kwiGitHub yam.

https://github.com/DFoly/User_log_pipeline

Kuko konke. Funda inxalenye yokuqala.

umthombo: www.habr.com

Yongeza izimvo