Sakha ipayipi lokucubungula idatha. Ingxenye 2

Sanibonani nonke. Sabelana ngokuhumusha kwengxenye yokugcina ye-athikili, elungiselelwe ngokukhethekile abafundi besifundo. Unjiniyela Wedatha. Ungafunda ingxenye yokuqala lapha.

I-Apache Beam ne-DataFlow yamapayipi esikhathi sangempela

Sakha ipayipi lokucubungula idatha. Ingxenye 2

Isetha i-Google Cloud

Qaphela: Ngisebenzise i-Google Cloud Shell ukuze ngiqalise umzila futhi ngishicilele idatha yelogi yangokwezifiso ngoba benginenkinga yokusebenzisa ipayipi ku-Python 3. I-Google Cloud Shell isebenzisa i-Python 2, evumelana kakhulu ne-Apache Beam.

Ukuze siqale ipayipi, sidinga ukumba kancane kuzilungiselelo. Kulabo kini abangakaze basebenzise i-GCP ngaphambili, kuzodingeka ukuthi ulandele izinyathelo eziyisi-6 ezilandelayo ezichazwe kulokhu ikhasi.

Ngemva kwalokhu, sizodinga ukulayisha izikripthi zethu ku-Google Cloud Storage futhi sizikopishe ku-Google Cloud Shel yethu. Ukulayisha kusitoreji samafu kuyinto encane kakhulu (incazelo ingatholakala lapha). Ukuze sikopishe amafayela ethu, singavula i-Google Cloud Shel kubha yamathuluzi ngokuchofoza isithonjana sokuqala kwesokunxele kuMfanekiso 2 ngezansi.

Sakha ipayipi lokucubungula idatha. Ingxenye 2
Umdwebo we-2

Imiyalo esiyidingayo ukuze sikopishe amafayela futhi sifake imitapo yolwazi edingekayo ibhalwe ngezansi.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Ukwakha isizindalwazi sethu kanye nethebula

Uma sesiqedele zonke izinyathelo ezihlobene nokusetha, into elandelayo okudingeka siyenze ukwakha idathasethi kanye nethebula ku-BigQuery. Kunezindlela ezimbalwa zokwenza lokhu, kodwa okulula kakhulu ukusebenzisa ikhonsoli ye-Google Cloud ngokudala isethi yedatha kuqala. Ungakwazi ukulandela izinyathelo ezingezansi isixhumanisiukwakha itafula nge-schema. Itafula lethu lizoba nalo 7 amakholomu, ehambisana nezingxenye zelogi ngayinye yomsebenzisi. Ukuze kube lula, sizochaza wonke amakholomu njengezintambo, ngaphandle kokuguquguquka kwesikhathi, futhi siqambe ngokuguquguqukayo esikwenzile ngaphambili. Ukwakheka kwetafula lethu kufanele kubukeke kuMfanekiso 3.

Sakha ipayipi lokucubungula idatha. Ingxenye 2
Umfanekiso 3. Isakhiwo sethebula

Ukushicilela idatha yelogi yomsebenzisi

I-Pub/Sub iyingxenye ebalulekile yepayipi lethu ngoba ivumela izinhlelo zokusebenza eziningi ezizimele ukuthi zixhumane. Ikakhulukazi, isebenza njengomxhumanisi esivumela ukuthi sithumele futhi samukele imilayezo phakathi kwezinhlelo zokusebenza. Into yokuqala okudingeka siyenze ukwakha isihloko. Vele uye ku-Pub/Sub kukhonsoli bese uchofoza okuthi DALA ISIHLOKO.

Ikhodi engezansi ibiza iskripthi sethu ukuze sikhiqize idatha yelogi echazwe ngenhla bese ixhuma futhi ithumele amalogi ku-Pub/Sub. Into kuphela okudingeka siyenze ukudala into UmshicileliClient, cacisa indlela eya esihlokweni usebenzisa indlela topic_path bese ubiza umsebenzi publish с topic_path kanye nedatha. Sicela uqaphele ukuthi singenisa generate_log_line kusuka kusikripthi sethu stream_logs, ngakho qiniseka ukuthi lawa mafayela akufolda efanayo, ngaphandle kwalokho uzothola iphutha lokungenisa. Ngemuva kwalokho singasebenzisa lokhu nge-google console yethu sisebenzisa:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Ngokushesha lapho ifayela liqalisa, sizokwazi ukubona okukhipha idatha yelogi kukhonsoli, njengoba kuboniswe esithombeni esingezansi. Lesi script sizosebenza inqobo nje uma singasisebenzisi I-CTRL + Cukuyiqeda.

Sakha ipayipi lokucubungula idatha. Ingxenye 2
Umfanekiso 4. Okukhiphayo publish_logs.py

Ukubhala ikhodi yethu yepayipi

Manje njengoba sesizilungisile zonke izinto, sesingaqala ingxenye ejabulisayo - ukubhala ngekhodi iphayiphi lethu sisebenzisa iBeam nePython. Ukuze sakhe ipayipi le-Beam, sidinga ukudala into yepayipi (p). Uma sesidale into yepayipi, singasebenzisa imisebenzi eminingi ngokulandelana sisebenzisa isisebenzisi pipe (|). Ngokuvamile, ukuhamba komsebenzi kubukeka njengesithombe esingezansi.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Kukhodi yethu, sizodala imisebenzi emibili yangokwezifiso. Umsebenzi regex_clean, eskena idatha futhi ibuyise umugqa ohambisanayo ngokusekelwe kuhlu lwe-PATTERNS isebenzisa umsebenzi re.search. Umsebenzi ubuyisela iyunithi yezinhlamvu ehlukaniswe ngokhefana. Uma ungeyena uchwepheshe wezinkulumo ezijwayelekile, ngincoma ukuthi uhlole lokhu okokufundisa futhi uzijwayeze kuncwajana ukuze uhlole ikhodi. Ngemva kwalokhu sichaza umsebenzi weParDo wangokwezifiso obizwa ngokuthi Hlukanisa, okuwukuhluka koshintsho lwe-Beam lokucubungula okufanayo. Ku-Python, lokhu kwenziwa ngendlela ekhethekile - kufanele sakhe ikilasi elizuza njengefa lekilasi le-DoFn Beam. Umsebenzi we-Split uthatha umugqa ohlukanisiwe kumsebenzi odlule bese ubuyisela uhlu lwezichazamazwi ezinokhiye abahambisana namagama ekholomu kuthebula lethu le-BigQuery. Kukhona okumele sikuqaphele mayelana nalo msebenzi: Bekumele ngingenise datetime ngaphakathi komsebenzi ukuze awenze asebenze. Bengithola iphutha lokungenisa ekuqaleni kwefayela, okwakuyinqaba. Lolu hlu lube seludluliselwa kuhlelo BhalaToBigQuery, okuvele kwengeze idatha yethu etafuleni. Ikhodi ye-Batch DataFlow Job kanye ne-Streaming DataFlow Job inikezwe ngezansi. Umehluko kuphela phakathi kweqoqo nekhodi yokusakaza ukuthi ku-batch sifunda i-CSV kuyo src_pathusebenzisa umsebenzi ReadFromText kusuka ku-Beam.

Umsebenzi we-Batch DataFlow (ukucubungula iqoqo)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Ukusakazwa kwe-DataFlow Job (ukucutshungulwa kokusakaza)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Iqala isithumeli

Singasebenzisa iphayiphi ngezindlela eziningana ezahlukene. Uma besifuna, besingavele siyiqhube endaweni sisuka kutheminali ngenkathi singena ku-GCP ukude.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Kodwa-ke, sizoyiqhuba sisebenzisa i-DataFlow. Singakwenza lokhu sisebenzisa umyalo ongezansi ngokusetha imingcele edingekayo elandelayo.

  • project - I-ID yephrojekthi yakho ye-GCP.
  • runner ungumgijimi wepayipi ozohlaziya uhlelo lwakho futhi akhe ipayipi lakho. Ukuze uqalise emafini, kufanele ucacise i-DataflowRunner.
  • staging_location β€” indlela eya kusitoreji samafu se-Cloud Dataflow ukuze uthole amaphakheji ekhodi ezinkomba adingwa abacubungula abenza umsebenzi.
  • temp_location β€” indlela eya kusitoreji samafu se-Cloud Dataflow ukuze kugcinwe amafayela emisebenzi yesikhashana adalwe ngenkathi ipayipi lisebenza.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Ngenkathi lo myalo usebenza, singaya kuthebhu ye-DataFlow kukhonsoli ye-google futhi sibuke ipayipi lethu. Uma sichofoza ipayipi, kufanele sibone into efanayo noMfanekiso 4. Ngezinjongo zokususa iphutha, kungasiza kakhulu ukuya ku-Logs bese uye ku-Stackdriver ukuze ubuke amalogi anemininingwane. Lokhu kungisizile ukuxazulula izinkinga zamapayipi ezimweni eziningi.

Sakha ipayipi lokucubungula idatha. Ingxenye 2
Umfanekiso 4: I-beam conveyor

Finyelela idatha yethu ku-BigQuery

Ngakho-ke, kufanele sesivele sinepayipi elisebenzayo elinedatha egeleza etafuleni lethu. Ukuhlola lokhu, singaya ku-BigQuery futhi sibheke idatha. Ngemva kokusebenzisa umyalo ongezansi kufanele ubone imigqa embalwa yokuqala yedathasethi. Manje njengoba sesinedatha egcinwe ku-BigQuery, singakwazi ukwenza ukuhlaziya okwengeziwe, futhi sabelane ngedatha nozakwethu futhi siqale ukuphendula imibuzo yebhizinisi.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Sakha ipayipi lokucubungula idatha. Ingxenye 2
Umfanekiso 5: I-BigQuery

isiphetho

Sithemba ukuthi lokhu okuthunyelwe kusebenza njengesibonelo esiwusizo sokudala ipayipi ledatha yokusakaza, kanye nokuthola izindlela zokwenza idatha ifinyeleleke kakhudlwana. Ukugcina idatha ngale fomethi kusinika izinzuzo eziningi. Manje sesingaqala ukuphendula imibuzo ebalulekile efana nokuthi bangaki abantu abasebenzisa umkhiqizo wethu? Ingabe isisekelo sakho sabasebenzisi siyakhula ngokuhamba kwesikhathi? Yiziphi izici zomkhiqizo abantu abasebenzisana nazo kakhulu? Futhi ingabe akhona amaphutha lapho kungafanele abe khona? Lena imibuzo ezoba nentshisekelo enhlanganweni. Ngokusekelwe emininingwaneni evela ezimpendulweni zale mibuzo, singathuthukisa umkhiqizo futhi sikhulise ukusebenzelana komsebenzisi.

I-Beam iwusizo ngempela kulolu hlobo lokuzivivinya futhi inenombolo yamanye amacala okusebenzisa athokozisayo. Isibonelo, ungase ufune ukuhlaziya idatha yomaka wamasheya ngesikhathi sangempela futhi wenze uhwebo ngokusekelwe ekuhlaziyeni, mhlawumbe unedatha yenzwa evela ezimotweni futhi ufuna ukubala izibalo zezinga lethrafikhi. Ungase futhi, isibonelo, ube inkampani yokudlala eqoqa idatha yomsebenzisi futhi iyisebenzisele ukudala amadeshibhodi ukuze ulandelele amamethrikhi angukhiye. Kulungile, madoda, lesi yisihloko sokunye okuthunyelwe, ngiyabonga ngokufunda, futhi kulabo abafuna ukubona ikhodi egcwele, ngezansi isixhumanisi se-GitHub yami.

https://github.com/DFoly/User_log_pipeline

Yilokho kuphela. Funda ingxenye yokuqala.

Source: www.habr.com

Engeza amazwana