Isu tinogadzira pombi yekugadzira data. Chikamu 2

Mhoroi mose. Isu tiri kugovera shandurudzo yechikamu chekupedzisira chechinyorwa, chakanyatsogadzirirwa vadzidzi vekosi. "Data Engineer". Unogona kuverenga chikamu chekutanga pano.

Apache Beam uye DataFlow yeChaiyo-Nguva Pipelines

Isu tinogadzira pombi yekugadzira data. Chikamu 2

Kuseta Google Cloud

Cherechedza: Ndakashandisa Google Cloud Shell kufambisa pombi uye kushambadza data regi nekuti ndanga ndiine dambudziko rekushandisa pombi muPython 3. Google Cloud Shell inoshandisa Python 2, iyo inonyanya kuenderana neApache Beam.

Kutanga pombi, tinoda kuchera zvishoma muzvirongwa. Kune avo venyu vasati vamboshandisa GCP, muchafanira kutevedzera nhanho nhanhatu dzinotevera dzakatsanangurwa mune izvi peji.

Mushure meizvi, isu tichada kurodha zvinyorwa zvedu kuGoogle Cloud Storage tozvikopa kuGoogle Cloud Shel yedu. Kurodha kune kuchengetwa kwegore kudiki kwazvo (tsanangudzo inogona kuwanikwa pano) Kuti tikope mafaera edu, tinogona kuvhura Google Cloud Shel kubva mubhara rekushandisa nekudzvanya chiratidzo chekutanga kuruboshwe muMufananidzo 2 pazasi.

Isu tinogadzira pombi yekugadzira data. Chikamu 2
Mufananidzo 2

Mirairo yatinoda kukopa mafaera uye kuisa maraibhurari anodiwa akanyorwa pazasi.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Kugadzira database yedu uye tafura

Kana tapedza matanho ese ane chekuita nekuseta, chinhu chinotevera chatinofanira kuita kugadzira dataset uye tafura muBigQuery. Pane nzira dzinoverengeka dzekuita izvi, asi iri nyore kushandisa Google Cloud console nekutanga kugadzira dataset. Unogona kutevera matanho ari pasi apa batanidzokugadzira tafura ine schema. Tafura yedu ichave nayo 7 columns, inoenderana nezvikamu zvemushandisi wega wega. Kuti zvive nyore, isu tichatsanangura makoramu ese setambo, kunze kwekusiyana kwenguva, uye tozvitumidza maererano nezvakasiyana zvatakagadzira kare. Kurongeka kwetafura yedu kunofanirwa kutaridzika semuMufananidzo 3.

Isu tinogadzira pombi yekugadzira data. Chikamu 2
Mufananidzo 3. Tafura yekugadzira

Kuburitsa data regi yevashandisi

Pub/Sub chinhu chakakosha chepombi yedu nekuti inobvumira akawanda akazvimirira maapplication kutaurirana. Kunyanya, inoshanda semurevereri inotibvumira kutumira uye kugamuchira mameseji pakati pezvikumbiro. Chinhu chekutanga chatinofanira kuita kugadzira musoro. Ingoenda kuPub/Sub mune koni uye tinya CREATE TOPIC.

Iyo kodhi iri pazasi inodaidza script yedu kuti igadzire iyo log data inotsanangurwa pamusoro uye yobva yabatanidza nekutumira matanda kuPub/Sub. Chinhu chimwe chete chatinofanira kuita kugadzira chinhu PublisherClient, tsanangura nzira yemusoro uchishandisa nzira topic_path uye kudana basa publish с topic_path uye data. Ndapota cherechedza kuti tinopinza kunze generate_log_line kubva pane yedu script stream_logs, saka ita shuwa kuti mafaera aya ari muforodha imwe chete, ukasadaro uchawana kukanganisa kupinza. Isu tinokwanisa kumhanyisa izvi kuburikidza neyedu google console tichishandisa:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Kana iyo faira ichingomhanya, isu tichakwanisa kuona kubuda kweiyo data data kune console, sezvakaratidzwa mumufananidzo uri pazasi. Ichi chinyorwa chichashanda chero isu tisingashandisi CTRL + Ckuipedzisa.

Isu tinogadzira pombi yekugadzira data. Chikamu 2
Mufananidzo 4. Kubuda publish_logs.py

Kunyora pombi yedu kodhi

Zvino zvatave nese zvakagadzirirwa, tinogona kutanga chikamu chinonakidza - kukodha pombi yedu tichishandisa Beam nePython. Kugadzira pombi yeBeam, tinoda kugadzira chinhu chepombi (p). Kana tangogadzira chinhu chepombi, tinogona kushandisa akawanda mabasa rimwe mushure memumwe tichishandisa mushandisi pipe (|). Kazhinji, kufamba kwebasa kunoratidzika semufananidzo uri pasi apa.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Mune kodhi yedu, isu tichagadzira maviri etsika mabasa. Function regex_clean, iyo inoongorora data uye inotora mutsara unoenderana zvichienderana nePATTERNS list uchishandisa basa re.search. Basa rinodzosa tambo yakapatsanurwa koma. Kana iwe usiri wenguva dzose kutaura nyanzvi, ini ndinokurudzira kutarisa izvi tutorial uye dzidzira mune notepad kutarisa kodhi. Mushure meizvi tinotsanangura tsika yeParDo inonzi paradzanisa, inova shanduko yeBeam shanduko yeparallel processing. MuPython, izvi zvinoitwa nenzira yakakosha - isu tinofanirwa kugadzira kirasi inotora nhaka kubva kuDoFn Beam kirasi. The Split function inotora mutsara wakapatsanurwa kubva pabasa rekare uye inodzosa rondedzero yemaduramazwi ane makiyi anoenderana nemazita emakoramu muBigQuery tafura yedu. Pane chimwe chinhu chekucherechedza nezvebasa iri: ndaifanira kupinza kunze datetime mukati mebasa kuti riite kushanda. Ndakanga ndichiwana kukanganisa kwekutumira pakutanga kwefaira, izvo zvaive zvisinganzwisisike. Rondedzero iyi inobva yaendeswa kune basa WriteToBigQuery, iyo inongowedzera data yedu patafura. Iyo kodhi yeBatch DataFlow Job uye Yekufambisa DataFlow Job inopiwa pazasi. Musiyano chete pakati pebatch uye yekufambisa kodhi ndeyekuti mubatch tinoverenga CSV kubva src_pathkushandisa basa ReadFromText kubva kuBeam.

Batch DataFlow Job (batch processing)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Kutenderera DataFlow Basa (rukova kugadzirisa)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Kutanga conveyor

Tinogona kumhanyisa pombi munzira dzinoverengeka dzakasiyana. Kana taida, taigona kungoimhanyisa munharaunda kubva kune terminal tichipinda muGCP kure.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Nekudaro, isu tichaimhanyisa tichishandisa DataFlow. Tinogona kuita izvi tichishandisa murairo uri pasi apa nekuisa zvinotevera zvinodiwa parameters.

  • project - ID yepurojekiti yako yeGCP.
  • runner mumhanyi wepombi inoongorora chirongwa chako uye kuvaka pombi yako. Kuti umhanye mugore, unofanira kutsanangura DataflowRunner.
  • staging_location - iyo nzira inoenda kuCloud Dataflow Cloud kuchengetedza ye indexing kodhi mapakeji anodiwa nema processor ari kuita basa.
  • temp_location - nzira inoenda kuCloud Dataflow Cloud kuchengetedza kuchengetedza mafaira ebasa kwenguva pfupi akagadzirwa nepo pombi iri kushanda.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Ipo uyu murairo uchishanda, tinogona kuenda kuDataFlow tab mugoogle console toona pombi yedu. Patinobaya pombi, tinofanira kuona chimwe chinhu chakafanana neChifananidzo 4. Nekuda kwekugadzirisa, zvinogona kubatsira zvikuru kuenda kuLogs uyezve kuStackdriver kuti uone matanda akadzama. Izvi zvakandibatsira kugadzirisa nyaya dzepombi mune akati wandei.

Isu tinogadzira pombi yekugadzira data. Chikamu 2
Mufananidzo 4: Beam conveyor

Svika data redu muBigQuery

Saka, isu tinofanira kunge tatova nepombi inomhanya ine data inoyerera mutafura yedu. Kuti uedze izvi, tinogona kuenda kuBigQuery totarisa iyo data. Mushure mekushandisa rairo pazasi iwe unofanirwa kuona mitsetse yekutanga yedhata. Iye zvino zvatine data rakachengetwa muBigQuery, tinogona kuitisa kumwe kuongorora, pamwe nekugovana data nevatinoshanda navo uye kutanga kupindura mibvunzo yebhizinesi.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Isu tinogadzira pombi yekugadzira data. Chikamu 2
Mufananidzo 5: BigQuery

mhedziso

Tinovimba kuti chinyorwa ichi chinoshanda semuenzaniso unobatsira wekugadzira pombi yekufambisa data, pamwe nekutsvaga nzira dzekuita kuti data iwanikwe. Kuchengeta data mune iyi fomati kunotipa akawanda mabhenefiti. Iye zvino tinogona kutanga kupindura mibvunzo yakakosha sekuti vangani vanhu vanoshandisa chigadzirwa chedu? Mushandisi wako ari kukura nekufamba kwenguva here? Ndezvipi zvikamu zvechigadzirwa izvo vanhu vanonyanya kusangana nazvo? Uye pane zvikanganiso here zvisingafanirwe kuve? Iyi ndiyo mibvunzo ichafarirwa nesangano. Zvichienderana neruzivo rwunobuda mumhinduro dzemibvunzo iyi, tinogona kuvandudza chigadzirwa uye kuwedzera kubatanidzwa kwevashandisi.

Beam inobatsira chaizvo kune iyi mhando yekurovedza muviri uye ine akati wandei mamwe anonakidza makesi ekushandisa zvakare. Semuenzaniso, iwe ungangoda kuongorora stock tick data munguva chaiyo uye kuita kutengeserana zvichienderana nekuongorora, pamwe une sensor data inouya kubva kumotokari uye unoda kuverenga traffic level kuverenga. Iwe unogona zvakare, semuenzaniso, kuve kambani yemitambo inounganidza data revashandisi uye inoishandisa kugadzira madhibhodhi ekutevera akakosha metrics. Zvakanaka, varume, iyi inyaya yeimwe positi, ndatenda nekuverenga, uye kune avo vanoda kuona iyo yakazara kodhi, pazasi ndiyo link kune yangu GitHub.

https://github.com/DFoly/User_log_pipeline

Ndizvozvo chete. Verenga chikamu chekutanga.

Source: www.habr.com

Voeg