Rydym yn creu llif llif prosesu data. Rhan 2

Helo i gyd. Rydym yn rhannu cyfieithiad o ran olaf yr erthygl, a baratowyd yn benodol ar gyfer myfyrwyr y cwrs. Peiriannydd Data. Gallwch ddarllen y rhan gyntaf yma.

Apache Beam a DataFlow ar gyfer Piblinellau Amser Real

Rydym yn creu llif llif prosesu data. Rhan 2

Sefydlu Google Cloud

Nodyn: Defnyddiais Google Cloud Shell i redeg y biblinell a chyhoeddi data log arferol oherwydd fy mod yn cael trafferth rhedeg y biblinell yn Python 3. Mae Google Cloud Shell yn defnyddio Python 2, sy'n fwy cyson ag Apache Beam.

I ddechrau'r biblinell, mae angen inni gloddio ychydig i'r gosodiadau. I'r rhai ohonoch nad ydych wedi defnyddio GCP o'r blaen, bydd angen i chi ddilyn y 6 cham canlynol a amlinellir yn hwn tudalen.

Ar Γ΄l hyn, bydd angen i ni uwchlwytho ein sgriptiau i Google Cloud Storage a'u copΓ―o i'n Google Cloud Shel. Mae uwchlwytho i storfa cwmwl yn eithaf dibwys (gellir dod o hyd i ddisgrifiad yma). I gopΓ―o ein ffeiliau, gallwn agor Google Cloud Shel o'r bar offer trwy glicio ar yr eicon cyntaf ar y chwith yn Ffigur 2 isod.

Rydym yn creu llif llif prosesu data. Rhan 2
Ffigur 2

Mae'r gorchmynion sydd eu hangen arnom i gopΓ―o'r ffeiliau a gosod y llyfrgelloedd gofynnol wedi'u rhestru isod.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Creu ein cronfa ddata a thabl

Unwaith y byddwn wedi cwblhau'r holl gamau sy'n gysylltiedig Γ’ gosod, y peth nesaf y mae angen i ni ei wneud yw creu set ddata a thabl yn BigQuery. Mae yna sawl ffordd o wneud hyn, ond y symlaf yw defnyddio consol Google Cloud trwy greu set ddata yn gyntaf. Gallwch ddilyn y camau isod cyswllti greu bwrdd gyda sgema. Bydd gan ein bwrdd 7 colofn, sy'n cyfateb i gydrannau pob log defnyddiwr. Er hwylustod, byddwn yn diffinio pob colofn fel llinynnau, ac eithrio'r newidyn timelocal, a'u henwi yn Γ΄l y newidynnau a gynhyrchwyd gennym yn gynharach. Dylai cynllun ein bwrdd edrych fel yn Ffigur 3.

Rydym yn creu llif llif prosesu data. Rhan 2
Ffigur 3. Cynllun y tabl

Cyhoeddi data log defnyddwyr

Mae Pub/Sub yn elfen hanfodol o'n piblinell oherwydd ei fod yn caniatΓ‘u i gymwysiadau annibynnol lluosog gyfathrebu Γ’'i gilydd. Yn benodol, mae'n gweithio fel cyfryngwr sy'n ein galluogi i anfon a derbyn negeseuon rhwng ceisiadau. Y peth cyntaf sydd angen i ni ei wneud yw creu pwnc. Yn syml, ewch i Pub/Sub yn y consol a chliciwch CREATE TOPIC.

Mae'r cod isod yn galw ein sgript i gynhyrchu'r data log a ddiffinnir uchod ac yna'n cysylltu ac yn anfon y logiau i Pub / Sub. Yr unig beth sydd angen i ni ei wneud yw creu gwrthrych CyhoeddwrClient, nodwch y llwybr i'r pwnc gan ddefnyddio'r dull topic_path a ffoniwch y swyddogaeth publish с topic_path a data. Sylwch ein bod yn mewnforio generate_log_line o'n sgript stream_logs, felly gwnewch yn sišr bod y ffeiliau hyn yn yr un ffolder, fel arall fe gewch wall mewnforio. Yna gallwn redeg hwn trwy ein consol google gan ddefnyddio:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Cyn gynted ag y bydd y ffeil yn rhedeg, byddwn yn gallu gweld allbwn y data log i'r consol, fel y dangosir yn y ffigur isod. Bydd y sgript hon yn gweithio cyn belled nad ydym yn ei defnyddio CTRL + Ci'w gwblhau.

Rydym yn creu llif llif prosesu data. Rhan 2
Ffigur 4. Allbwn publish_logs.py

Ysgrifennu ein cod piblinell

Nawr bod gennym bopeth wedi'i baratoi, gallwn ddechrau'r rhan hwyliog - codio ein piblinell gan ddefnyddio Beam a Python. I greu piblinell Beam, mae angen i ni greu gwrthrych piblinell (p). Unwaith y byddwn wedi creu gwrthrych piblinell, gallwn gymhwyso swyddogaethau lluosog un ar Γ΄l y llall gan ddefnyddio'r gweithredwr pipe (|). Yn gyffredinol, mae'r llif gwaith yn edrych fel y ddelwedd isod.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Yn ein cod, byddwn yn creu dwy swyddogaeth arferiad. Swyddogaeth regex_clean, sy'n sganio'r data ac yn adfer y rhes gyfatebol yn seiliedig ar y rhestr PATTERNS gan ddefnyddio'r swyddogaeth re.search. Mae'r ffwythiant yn dychwelyd llinyn wedi'i wahanu gan goma. Os nad ydych chi'n arbenigwr mynegiant rheolaidd, rwy'n argymell gwirio hyn tiwtorial ac ymarferwch mewn llyfr nodiadau i wirio'r cod. Ar Γ΄l hyn rydym yn diffinio swyddogaeth ParDo arferol o'r enw Hollti, sy'n amrywiad o'r trawsnewid Beam ar gyfer prosesu cyfochrog. Yn Python, gwneir hyn mewn ffordd arbennig - rhaid creu dosbarth sy'n etifeddu o ddosbarth DoFn Beam. Mae'r ffwythiant Hollti yn cymryd y rhes wedi'i dosrannu o'r ffwythiant blaenorol ac yn dychwelyd rhestr o eiriaduron gydag allweddi sy'n cyfateb i enwau'r colofnau yn ein tabl BigQuery. Mae rhywbeth i'w nodi am y swyddogaeth hon: roedd yn rhaid i mi fewnforio datetime tu mewn i swyddogaeth i wneud iddo weithio. Roeddwn yn cael gwall mewnforio ar ddechrau'r ffeil, a oedd yn rhyfedd. Yna mae'r rhestr hon yn cael ei throsglwyddo i'r swyddogaeth WriteToBigQuery, sydd yn syml yn ychwanegu ein data at y tabl. Rhoddir y cod ar gyfer Swp DataFlow Job a Streaming DataFlow Job isod. Yr unig wahaniaeth rhwng swp a chod ffrydio yw ein bod yn darllen y CSV ohono mewn swp src_pathdefnyddio'r swyddogaeth ReadFromText o Beam.

Swydd Swp DataFlow (prosesu swp)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Ffrydio DataFlow Job (prosesu ffrwd)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Cychwyn y cludwr

Gallwn redeg y biblinell mewn sawl ffordd wahanol. Pe baem yn dymuno, gallem ei redeg yn lleol o derfynell wrth fewngofnodi i GCP o bell.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Fodd bynnag, rydyn ni'n mynd i'w redeg gan ddefnyddio DataFlow. Gallwn wneud hyn gan ddefnyddio'r gorchymyn isod trwy osod y paramedrau gofynnol canlynol.

  • project β€” ID eich prosiect GCP.
  • runner yn rhedwr piblinell a fydd yn dadansoddi eich rhaglen ac yn adeiladu eich piblinell. I redeg yn y cwmwl, rhaid i chi nodi DataflowRunner.
  • staging_location - y llwybr i storfa cwmwl Cloud Dataflow ar gyfer mynegeio pecynnau cod sydd eu hangen ar y proseswyr sy'n cyflawni'r gwaith.
  • temp_location - llwybr i storfa cwmwl Cloud Dataflow ar gyfer storio ffeiliau swyddi dros dro a grΓ«wyd tra bod y biblinell yn rhedeg.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Tra bod y gorchymyn hwn yn rhedeg, gallwn fynd i'r tab DataFlow yn y consol google a gweld ein piblinell. Pan fyddwn yn clicio ar y biblinell, dylem weld rhywbeth tebyg i Ffigur 4. At ddibenion dadfygio, gall fod yn ddefnyddiol iawn mynd i Logs ac yna i Stackdriver i weld y logiau manwl. Mae hyn wedi fy helpu i ddatrys problemau piblinellau mewn nifer o achosion.

Rydym yn creu llif llif prosesu data. Rhan 2
Ffigur 4: Cludwr trawst

Cyrchwch ein data yn BigQuery

Felly, dylai fod gennym eisoes biblinell yn rhedeg gyda data yn llifo i'n tabl. I brofi hyn, gallwn fynd i BigQuery ac edrych ar y data. Ar Γ΄l defnyddio'r gorchymyn isod dylech weld yr ychydig resi cyntaf o'r set ddata. Nawr bod gennym y data wedi'i storio yn BigQuery, gallwn gynnal dadansoddiad pellach, yn ogystal Γ’ rhannu'r data gyda chydweithwyr a dechrau ateb cwestiynau busnes.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Rydym yn creu llif llif prosesu data. Rhan 2
Ffigur 5: BigQuery

Casgliad

Gobeithiwn y bydd y swydd hon yn enghraifft ddefnyddiol o greu piblinell ddata ffrydio, yn ogystal Γ’ dod o hyd i ffyrdd o wneud data yn fwy hygyrch. Mae storio data yn y fformat hwn yn rhoi llawer o fanteision i ni. Nawr gallwn ddechrau ateb cwestiynau pwysig fel faint o bobl sy'n defnyddio ein cynnyrch? A yw eich sylfaen defnyddwyr yn tyfu dros amser? Pa agweddau ar y cynnyrch y mae pobl yn rhyngweithio Γ’ nhw fwyaf? Ac a oes gwallau lle na ddylai fod? Dyma’r cwestiynau a fydd o ddiddordeb i’r sefydliad. Yn seiliedig ar y mewnwelediadau sy'n deillio o'r atebion i'r cwestiynau hyn, gallwn wella'r cynnyrch a chynyddu ymgysylltiad defnyddwyr.

Mae Beam yn ddefnyddiol iawn ar gyfer y math hwn o ymarfer corff ac mae ganddo nifer o achosion defnydd diddorol eraill hefyd. Er enghraifft, efallai y byddwch am ddadansoddi data ticio stoc mewn amser real a gwneud crefftau yn seiliedig ar y dadansoddiad, efallai bod gennych ddata synhwyrydd yn dod o gerbydau ac eisiau cyfrifo cyfrifiadau lefel traffig. Gallech hefyd, er enghraifft, fod yn gwmni hapchwarae sy'n casglu data defnyddwyr ac yn ei ddefnyddio i greu dangosfyrddau i olrhain metrigau allweddol. Iawn, foneddigion, mae hwn yn bwnc ar gyfer post arall, diolch am ddarllen, ac i'r rhai sydd am weld y cod llawn, isod mae'r ddolen i fy GitHub.

https://github.com/DFoly/User_log_pipeline

Dyna i gyd. Darllenwch ran un.

Ffynhonnell: hab.com

Ychwanegu sylw