Em boriyek hilberandina daneya herikînê diafirînin. Beş 2

Silav hemû. Em wergera beşa dawî ya gotarê, ku bi taybetî ji bo xwendekarên qursê hatî amadekirin, parve dikin. "Endezyar Daneyên". Hûn dikarin beşa yekem bixwînin vir.

Apache Beam û DataFlow ji bo Pipelines Real-Time

Em boriyek hilberandina daneya herikînê diafirînin. Beş 2

Sazkirina Google Cloud

Nîşe: Min Google Cloud Shell bikar anî da ku boriyê bimeşîne û daneyên têketinê yên xwerû biweşîne ji ber ku min tengasiya xebitandina boriyê di Python 3-ê de dikişand. Google Cloud Shell Python 2 bikar tîne, ku bi Apache Beam re hevahengtir e.

Ji bo destpêkirina boriyê, pêdivî ye ku em hinekî li mîhengan bikolin. Ji bo we yên ku berê GCP bikar neanîne, hûn ê hewce bikin ku 6 gavên jêrîn ên ku di vê yekê de hatine destnîşan kirin bişopînin rûpel.

Piştî vê yekê, em ê hewce bikin ku nivîsarên xwe li Google Cloud Storage bar bikin û wan li Google Cloud Shel-a xwe kopî bikin. Barkirina li hilanîna ewr pir hindik e (navokek dikare were dîtin vir). Ji bo kopîkirina pelên me, em dikarin Google Cloud Shel ji darikê amûran vekin bi tikandina yekem îkonê li milê çepê di xêza 2-a jêrîn de.

Em boriyek hilberandina daneya herikînê diafirînin. Beş 2
Hêjmar 2

Fermanên ku em hewce ne ku pelan kopî bikin û pirtûkxaneyên pêwîst saz bikin li jêr têne rêz kirin.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Afirandina databas û tabloya me

Piştî ku me hemî gavên têkildarî sazkirinê qedandin, tiştê din ku divê em bikin ev e ku di BigQuery de danehev û tabloyek çêbikin. Gelek awayên kirina vê yekê hene, lê ya herî hêsan ev e ku meriv konsolê Google Cloud-ê bikar bîne û pêşî danûstendinek biafirîne. Hûn dikarin gavên jêrîn bişopînin linkji bo afirandina tabloyek bi şema. Maseya me dê hebe 7 stûn, bi pêkhateyên her têketinek bikarhêner re têkildar e. Ji bo rehetiyê, em ê ji bilî guhêrbara herêmî ya demkî, hemî stûnan wekî rêzan pênase bikin, û li gorî guhêrbarên ku me berê çêkirine navên wan bidin. Plansaziya tabloya me divê wekî di jimar 3 de xuya bike.

Em boriyek hilberandina daneya herikînê diafirînin. Beş 2
Wêne 3. Çêkirina maseyê

Weşandina daneyên têketina bikarhêner

Pub/Sub hêmanek krîtîk a xeta me ye ji ber ku ew dihêle ku gelek serîlêdanên serbixwe bi hevûdu re têkilî daynin. Bi taybetî, ew wekî navbeynkarek dixebite ku destûrê dide me ku em di navbera serlêdanan de peyaman bişînin û bistînin. Yekem tiştê ku divê em bikin ev e ku mijarek çêbikin. Tenê di konsolê de biçin Pub/Sub û bikirtînin CREATE TOPIC.

Koda li jêr gazî skrîpta me dike da ku daneya têketinê ya ku li jor hatî destnîşan kirin biafirîne û dûv re têketinan bi Pub/Sub ve girêdide û dişîne. Tişta ku divê em bikin ev e ku meriv biafirîne PublisherClient, rêya mijarê bi bikaranîna rêbazê diyar bike topic_path û fonksiyonê bang bikin publish с topic_path û daneyan. Ji kerema xwe not bikin ku em import dikin generate_log_line ji nivîsara me stream_logs, ji ber vê yekê piştrast bikin ku ev pel di heman peldankê de ne, wekî din hûn ê xeletiyek importê bistînin. Dûv re em dikarin vê bi konsolê xweya google-ê bi karanîna:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Hema ku pel dimeşe, em ê bikaribin wekî ku di jimareya jêrîn de tê xuyang kirin, derketina daneyên têketinê li konsolê bibînin. Heya ku em bikar neynin ev skrîpt dê bixebite CTRL + Cji bo temamkirina wê.

Em boriyek hilberandina daneya herikînê diafirînin. Beş 2
jimar 4. Derketin publish_logs.py

Koda meya boriyê dinivîse

Naha ku me her tişt amade kiriye, em dikarin beşa kêfê dest pê bikin - kodkirina lûleya xwe bi karanîna Beam û Python. Ji bo çêkirina boriyeke Beam, divê em babeteke boriyê (p) biafirînin. Dema ku me tiştek boriyê çêkir, em dikarin bi karanîna operatorê ve gelek fonksiyonan yek li pey hev bicîh bikin pipe (|). Bi gelemperî, xebata xebatê wekî wêneya jêrîn xuya dike.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Di koda me de, em ê du fonksiyonên xwerû biafirînin. Karkirin regex_clean, ku daneyan dikole û rêza têkildar li ser bingeha navnîşa PATTERNS bi karanîna fonksiyonê vedigire re.search. Fonksiyon rêzikek ji hev veqetandî vedigerîne. Heke hûn ne pisporê vegotina birêkûpêk in, ez pêşniyar dikim ku vê yekê kontrol bikin tutorial û di notepadê de pratîk bikin da ku kodê kontrol bikin. Piştî vê yekê em fonksiyonek ParDo-ya xwerû ya ku jê re tê gotin diyar dikin Qelişandin, ku guhertoyek veguherîna Beam ji bo pêvajoyek paralel e. Di Python de, ev bi rengek taybetî tête kirin - divê em çînek ku ji çîna DoFn Beam mîras digire biafirînin. Fonksiyona Split rêza parkirî ji fonksiyona berê digire û navnîşek ferhengan bi bişkojkên ku bi navên stûnên tabloya meya BigQuery re têkildar in vedigerîne. Di derbarê vê fonksiyonê de tiştek heye ku bala xwe bide: Diviya bû ku ez import bikim datetime di hundurê fonksiyonek de da ku ew bixebite. Min di destpêka pelê de xeletiyek importê digirt, ku ecêb bû. Dûv re ev navnîş ji fonksiyonê re derbas dibe WriteToBigQuery, ku bi tenê daneyên me li ser sifrê zêde dike. Koda ji bo Batch DataFlow Job û Streaming DataFlow Job li jêr tê dayîn. Cudahiya yekane di navbera koda hevîrê û vekêşanê de ev e ku em CSV-ê ji hev dixwînin src_pathfonksiyonê bikar tîne ReadFromText ji Beam.

Karê DataFlow Batch (pêvajoya hevîrê)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Karê Daneya Flow Streaming (Pêvajoya herikandinê)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Veguheztinê dest pê dike

Em dikarin boriyê bi çend awayên cihê bimeşînin. Ger me bixwesta, dema ku ji dûr ve têketin GCP-ê, em dikaribûn wê ji termînalekê herêmî bimeşînin.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Lêbelê, em ê wê bi karanîna DataFlow bimeşînin. Em dikarin vê yekê bi karanîna fermana jêrîn bi danîna pîvanên pêwîst ên jêrîn bikin.

  • project - Nasnameya projeya weya GCP.
  • runner rêvekerek boriyê ye ku dê bernameya we analîz bike û xeta boriyê we ava bike. Ji bo ku hûn di ewr de bixebitin, divê hûn DataflowRunnerek diyar bikin.
  • staging_location - Rêya hilanîna cloudê ya Cloud Dataflow ji bo navnîşkirina pakêtên kodê yên ku ji hêla pêvajoyên ku xebatê pêk tînin hewce ne.
  • temp_location - Rêya hilanîna cloudê Cloud Dataflow ji bo hilanîna pelên kar ên demkî yên ku dema ku boriyê dimeşe hatine çêkirin.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Dema ku ev ferman dimeşe, em dikarin biçin tabloya DataFlow ya di konsolê google de û xeta xweya xwe bibînin. Dema ku em li ser boriyê bitikînin, divê em tiştek dişibihe Wêne 4-ê bibînin. Ji bo mebestên debugkirinê, ew dikare pir arîkar be ku biçin Logs û dûv re jî li Stackdriver-ê ku têketinên berfireh bibînin. Vê yekê di gelek rewşan de alîkariya min kir ku pirsgirêkên boriyê çareser bikim.

Em boriyek hilberandina daneya herikînê diafirînin. Beş 2
Şikil 4: Veguhastina tîrêjê

Di BigQuery de bigihîjin daneyên me

Ji ber vê yekê, divê em berê xwedan boriyek hebe ku daneyên ku di tabloya me de diherike. Ji bo ceribandina vê, em dikarin biçin BigQuery û li daneyan binihêrin. Piştî ku emrê jêrîn bikar bînin, divê hûn çend rêzên pêşîn ên daneyê bibînin. Naha ku daneyên me li BigQuery hatine hilanîn, em dikarin analîzek din bikin, û hem jî daneyan bi hevkaran re parve bikin û dest bi bersivandina pirsên karsaziyê bikin.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Em boriyek hilberandina daneya herikînê diafirînin. Beş 2
Figure 5: BigQuery

encamê

Em hêvî dikin ku ev post wekî mînakek bikêr e ji bo afirandina boriyek daneya veguhêz, û her weha peydakirina awayên ku daneyan bigihînin zêdetir bike. Hilberîna daneyan di vê formatê de gelek avantajan dide me. Naha em dikarin dest bi bersivandina pirsên girîng bikin ka çend kes hilberê me bikar tînin? Bingeha bikarhênerê we bi demê re mezin dibe? Mirov herî zêde bi kîjan aliyên hilberê re têkilî dike? Û gelo xeletî hene ku lê nebin? Ev pirsên ku dê ji bo rêxistinê balkêş in. Li ser bingeha têgihiştinên ku ji bersivên van pirsan derdikevin, em dikarin hilberê çêtir bikin û tevlêbûna bikarhêner zêde bikin.

Beam bi rastî ji bo vê celebê werzîşê bikêr e û gelek rewşên karanîna balkêş ên din jî hene. Mînakî, dibe ku hûn bixwazin di demek rast de daneyên tikandina stock analîz bikin û li ser bingeha analîzê bazirganiyê bikin, dibe ku we daneyên sensor ên ku ji wesayîtan têne hene û dixwazin hesabên asta trafîkê hesab bikin. Wekî mînak, hûn dikarin bibin pargîdaniyek lîstikê ku daneyên bikarhêner berhev dike û wê bikar tîne da ku tabloyan biafirîne da ku pîvanên sereke bişopîne. Baş e, birêzan, ev mijarek ji bo postek din e, spas ji bo xwendinê, û ji bo kesên ku dixwazin koda tevahî bibînin, li jêr lînka GitHub-a min heye.

https://github.com/DFoly/User_log_pipeline

Navê pêger. Beşa yekê bixwînin.

Source: www.habr.com

Add a comment