Aħna noħolqu pipeline tal-ipproċessar tad-dejta tan-nixxiegħa. Parti 2

Hi kollha. Qed naqsmu t-traduzzjoni tal-parti finali tal-artiklu, ippreparata speċifikament għall-istudenti tal-kors. Inġinier tad-Data. Tista' taqra l-ewwel parti hawn.

Apache Beam u DataFlow għal Pipelines f'ħin reali

Aħna noħolqu pipeline tal-ipproċessar tad-dejta tan-nixxiegħa. Parti 2

Twaqqif ta' Google Cloud

Nota: Jien użajt Google Cloud Shell biex tħaddem il-pipeline u nippubblika dejta tal-ġurnal tad-dwana għax kont qed ikolli problemi biex inħaddem il-pipeline f'Python 3. Google Cloud Shell juża Python 2, li huwa aktar konsistenti ma 'Apache Beam.

Biex nibdew il-pipeline, irridu nħaffru ftit fis-settings. Għal dawk minnkom li ma użawx GCP qabel, ser ikollok bżonn issegwi s-6 passi li ġejjin deskritti f'dan paġna.

Wara dan, ikollna bżonn intellgħu l-iskripts tagħna fuq Google Cloud Storage u nikkopjahom fil-Google Cloud Shel tagħna. It-tlugħ fil-ħażna tal-cloud huwa pjuttost trivjali (tista' ssib deskrizzjoni hawn). Biex tikkopja l-fajls tagħna, nistgħu niftħu Google Cloud Shel mill-toolbar billi nikklikkjaw l-ewwel ikona fuq ix-xellug fil-Figura 2 hawn taħt.

Aħna noħolqu pipeline tal-ipproċessar tad-dejta tan-nixxiegħa. Parti 2
Figura 2

Il-kmandi li għandna bżonn nikkopjaw il-fajls u ninstallaw il-libreriji meħtieġa huma elenkati hawn taħt.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Noħolqu database u tabella tagħna

Ladarba nkunu lestejna l-passi kollha relatati mas-setup, il-ħaġa li jmiss li rridu nagħmlu hija li noħolqu dataset u tabella f'BigQuery. Hemm diversi modi kif tagħmel dan, iżda l-aktar sempliċi huwa li tuża l-console Google Cloud billi l-ewwel toħloq sett ta 'dejta. Tista 'ssegwi l-passi hawn taħt rabtabiex toħloq tabella bi schema. Il-mejda tagħna se jkollha 7 kolonni, li jikkorrispondi għall-komponenti ta 'kull log tal-utent. Għall-konvenjenza, aħna se niddefinixxu l-kolonni kollha bħala kordi, ħlief għall-varjabbli timelocal, u nsemmuhom skont il-varjabbli li ġġenerajna qabel. It-tqassim tat-tabella tagħna għandu jidher bħal fil-Figura 3.

Aħna noħolqu pipeline tal-ipproċessar tad-dejta tan-nixxiegħa. Parti 2
Figura 3. Tqassim tal-mejda

Il-pubblikazzjoni tad-dejta tal-log tal-utent

Pub/Sub huwa komponent kritiku tal-pipeline tagħna għaliex jippermetti applikazzjonijiet indipendenti multipli jikkomunikaw ma 'xulxin. B'mod partikolari, jaħdem bħala intermedjarju li jippermettilna nibagħtu u nirċievu messaġġi bejn l-applikazzjonijiet. L-ewwel ħaġa li rridu nagħmlu hija li noħolqu suġġett. Sempliċement mur Pub/Sub fil-console u kklikkja OĦLOQ SUĠĠETT.

Il-kodiċi hawn taħt isejjaħ l-iskrittura tagħna biex jiġġenera d-dejta tal-log definita hawn fuq u mbagħad jgħaqqad u jibgħat ir-zkuk lil Pub/Sub. L-unika ħaġa li rridu nagħmlu hija li noħolqu oġġett PublisherClient, speċifika t-triq għas-suġġett billi tuża l-metodu topic_path u sejjaħ il-funzjoni publish с topic_path u data. Jekk jogħġbok innota li aħna importazzjoni generate_log_line mill-iskrittura tagħna stream_logs, sabiex kun żgur li dawn il-fajls huma fl-istess folder, inkella inti se tikseb żball ta 'importazzjoni. Nistgħu mbagħad inħaddmu dan permezz tal-console google tagħna billi tuża:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Hekk kif il-fajl jibda, inkunu nistgħu naraw l-output tad-dejta tal-log lill-console, kif muri fil-figura hawn taħt. Din l-iskrittura se taħdem sakemm ma nużawx CTRL + Ċbiex tlestiha.

Aħna noħolqu pipeline tal-ipproċessar tad-dejta tan-nixxiegħa. Parti 2
Figura 4. Output publish_logs.py

Nikteb il-kodiċi tal-pipeline tagħna

Issa li għandna kollox ippreparat, nistgħu nibdew il-parti divertenti - nikkodifikaw il-pipeline tagħna billi tuża Beam u Python. Biex noħolqu pipeline Beam, għandna bżonn noħolqu oġġett pipeline (p). Ladarba ħloqna oġġett tal-pipeline, nistgħu napplikaw funzjonijiet multipli wieħed wara l-ieħor bl-użu tal-operatur pipe (|). B'mod ġenerali, il-fluss tax-xogħol jidher bħall-immaġni hawn taħt.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Fil-kodiċi tagħna, aħna se noħolqu żewġ funzjonijiet tad-dwana. Funzjoni regex_clean, li tiskennja d-data u tirkupra r-ringiela korrispondenti bbażata fuq il-lista PATTERNS bl-użu tal-funzjoni re.search. Il-funzjoni tirritorna string separata bil-virgola. Jekk m'intix espert tal-espressjoni regolari, nirrakkomanda li tiċċekkja dan tutorja u prattika f'notepad biex tiċċekkja l-kodiċi. Wara dan aħna niddefinixxu funzjoni personalizzata ParDo imsejħa Split, li hija varjazzjoni tat-trasformazzjoni Beam għall-ipproċessar parallel. F'Python, dan isir b'mod speċjali - irridu noħolqu klassi li tirret mill-klassi DoFn Beam. Il-funzjoni Split tieħu r-ringiela parsed mill-funzjoni preċedenti u tirritorna lista ta 'dizzjunarji b'ċwievet li jikkorrispondu għall-ismijiet tal-kolonni fit-tabella BigQuery tagħna. Hemm xi ħaġa li wieħed jinnota dwar din il-funzjoni: kelli jimporta datetime ġewwa funzjoni biex din taħdem. Kont qed inġib żball ta' importazzjoni fil-bidu tal-fajl, li kien stramb. Din il-lista mbagħad tiġi mgħoddija lill-funzjoni WriteToBigQuery, li sempliċement iżid id-dejta tagħna mat-tabella. Il-kodiċi għal Batch DataFlow Job u Streaming DataFlow Job huwa mogħti hawn taħt. L-unika differenza bejn il-lott u l-kodiċi streaming hija li fil-lott naqraw is-CSV minn src_pathbl-użu tal-funzjoni ReadFromText minn Beam.

Batch DataFlow Job (ipproċessar tal-lott)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Xogħol ta' Streaming DataFlow (ipproċessar ta' fluss)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Jibda l-conveyor

Nistgħu nħaddmu l-pipeline b'diversi modi differenti. Kieku ridna, nistgħu sempliċement inħaddmuh lokalment minn terminal waqt li nilloggjaw fil-GCP mill-bogħod.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Madankollu, se nħaddmuha bl-użu tad-DataFlow. Nistgħu nagħmlu dan billi tuża l-kmand hawn taħt billi nissettjaw il-parametri meħtieġa li ġejjin.

  • project — ID tal-proġett GCP tiegħek.
  • runner huwa pipeline runner li se janalizza l-programm tiegħek u jibni pipeline tiegħek. Biex taħdem fis-sħaba, trid tispeċifika DataflowRunner.
  • staging_location — il-mogħdija għall-ħażna tal-cloud tal-Cloud Dataflow għall-indiċjar tal-pakketti tal-kodiċi meħtieġa mill-proċessuri li jwettqu x-xogħol.
  • temp_location — mogħdija għall-ħażna tal-cloud tal-Cloud Dataflow għall-ħażna ta’ fajls tax-xogħol temporanji maħluqa waqt li l-pipeline ikun qed jaħdem.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Waqt li dan il-kmand ikun qed jaħdem, nistgħu mmorru fit-tab DataFlow fil-google console u naraw il-pipeline tagħna. Meta nikklikkjaw fuq il-pipeline, għandna naraw xi ħaġa simili għal Figura 4. Għal skopijiet ta 'debugging, jista' jkun ta 'għajnuna kbira li tmur għal Zkuk u mbagħad għal Stackdriver biex tara r-zkuk dettaljati. Dan għenni nsolvi kwistjonijiet ta' pipeline f'numru ta' każijiet.

Aħna noħolqu pipeline tal-ipproċessar tad-dejta tan-nixxiegħa. Parti 2
Figura 4: Beam conveyor

Aċċessa d-dejta tagħna fi BigQuery

Għalhekk, għandu diġà jkollna pipeline li jaħdem bid-dejta li tiċċirkola fit-tabella tagħna. Biex nittestjaw dan, nistgħu mmorru BigQuery u nħarsu lejn id-dejta. Wara li tuża l-kmand hawn taħt għandek tara l-ewwel ftit ringieli tad-dataset. Issa li għandna d-dejta maħżuna f'BigQuery, nistgħu nwettqu aktar analiżi, kif ukoll naqsmu d-dejta mal-kollegi u nibdew inwieġbu mistoqsijiet tan-negozju.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Aħna noħolqu pipeline tal-ipproċessar tad-dejta tan-nixxiegħa. Parti 2
Figura 5: BigQuery

Konklużjoni

Nittamaw li din il-kariga sservi bħala eżempju utli tal-ħolqien ta 'pipeline tad-dejta streaming, kif ukoll li jinstabu modi biex id-dejta tkun aktar aċċessibbli. Il-ħażna tad-dejta f'dan il-format tagħtina ħafna vantaġġi. Issa nistgħu nibdew inwieġbu mistoqsijiet importanti bħal kemm nies jużaw il-prodott tagħna? Il-bażi tal-utenti tiegħek qed tikber maż-żmien? In-nies b'liema aspetti tal-prodott jinteraġixxu l-aktar? U hemm żbalji fejn m'għandux ikun hemm? Dawn huma l-mistoqsijiet li se jkunu ta’ interess għall-organizzazzjoni. Ibbażat fuq l-għarfien li joħroġ mit-tweġibiet għal dawn il-mistoqsijiet, nistgħu ntejbu l-prodott u nżidu l-involviment tal-utent.

Beam huwa verament utli għal dan it-tip ta 'eżerċizzju u għandu numru ta' każijiet ta 'użu interessanti oħra wkoll. Pereżempju, tista 'tixtieq tanalizza d-dejta tal-qurdien tal-istokk f'ħin reali u tagħmel kummerċ ibbażat fuq l-analiżi, forsi għandek dejta tas-sensorju li ġejja minn vetturi u trid tikkalkula l-kalkoli tal-livell tat-traffiku. Tista' wkoll, pereżempju, tkun kumpanija tal-logħob li tiġbor data tal-utent u tużaha biex toħloq dashboards biex issegwi metriċi ewlenin. Tajjeb, rġulija, dan huwa suġġett għal post ieħor, grazzi tal-qari, u għal dawk li jixtiequ jaraw il-kodiċi sħiħ, hawn taħt hemm il-link għal GitHub tiegħi.

https://github.com/DFoly/User_log_pipeline

Dak kollox. Aqra l-ewwel parti.

Sors: www.habr.com

Żid kumment