Rydym yn creu llif llif prosesu data. Rhan 2

Helo i gyd. Rydym yn rhannu cyfieithiad o ran olaf yr erthygl, a baratowyd yn benodol ar gyfer myfyrwyr y cwrs. Peiriannydd Data. Gallwch ddarllen y rhan gyntaf yma.

Apache Beam a DataFlow ar gyfer Piblinellau Amser Real

Rydym yn creu llif llif prosesu data. Rhan 2

Sefydlu Google Cloud

Nodyn: Defnyddiais Google Cloud Shell i redeg y biblinell a chyhoeddi data log arferol oherwydd fy mod yn cael trafferth rhedeg y biblinell yn Python 3. Mae Google Cloud Shell yn defnyddio Python 2, sy'n fwy cyson ag Apache Beam.

I ddechrau'r biblinell, mae angen inni gloddio ychydig i'r gosodiadau. I'r rhai ohonoch nad ydych wedi defnyddio GCP o'r blaen, bydd angen i chi ddilyn y 6 cham canlynol a amlinellir yn hwn tudalen.

Ar Γ΄l hyn, bydd angen i ni uwchlwytho ein sgriptiau i Google Cloud Storage a'u copΓ―o i'n Google Cloud Shel. Mae uwchlwytho i storfa cwmwl yn eithaf dibwys (gellir dod o hyd i ddisgrifiad yma). I gopΓ―o ein ffeiliau, gallwn agor Google Cloud Shel o'r bar offer trwy glicio ar yr eicon cyntaf ar y chwith yn Ffigur 2 isod.

Rydym yn creu llif llif prosesu data. Rhan 2
Ffigur 2

Mae'r gorchmynion sydd eu hangen arnom i gopΓ―o'r ffeiliau a gosod y llyfrgelloedd gofynnol wedi'u rhestru isod.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Π‘ΠΎΠ·Π΄Π°Π½ΠΈΠ΅ нашСй Π±Π°Π·Ρ‹ Π΄Π°Π½Π½Ρ‹Ρ… ΠΈ Ρ‚Π°Π±Π»ΠΈΡ†Ρ‹

Unwaith y byddwn wedi cwblhau'r holl gamau sy'n gysylltiedig Γ’ gosod, y peth nesaf y mae angen i ni ei wneud yw creu set ddata a thabl yn BigQuery. Mae yna sawl ffordd o wneud hyn, ond y symlaf yw defnyddio consol Google Cloud trwy greu set ddata yn gyntaf. Gallwch ddilyn y camau isod cyswllti greu bwrdd gyda sgema. Bydd gan ein bwrdd 7 colofn, ΡΠΎΠΎΡ‚Π²Π΅Ρ‚ΡΡ‚Π²ΡƒΡŽΡ‰ΠΈΡ… ΠΊΠΎΠΌΠΏΠΎΠ½Π΅Π½Ρ‚Π°ΠΌ ΠΊΠ°ΠΆΠ΄ΠΎΠ³ΠΎ ΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚Π΅Π»ΡŒΡΠΊΠΎΠ³ΠΎ Π»ΠΎΠ³Π°. Для удобства ΠΌΡ‹ ΠΎΠΏΡ€Π΅Π΄Π΅Π»ΠΈΠΌ всС столбцы ΠΊΠ°ΠΊ строки (Ρ‚ΠΈΠΏ string), Π·Π° ΠΈΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ΠΌ ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½ΠΎΠΉ timelocal, ΠΈ Π½Π°Π·ΠΎΠ²Π΅ΠΌ ΠΈΡ… Π² соотвСтствии с ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½Ρ‹ΠΌΠΈ, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ ΠΌΡ‹ сгСнСрировали Ρ€Π°Π½Π΅Π΅. Π‘Ρ…Π΅ΠΌΠ° нашСй Ρ‚Π°Π±Π»ΠΈΡ†Ρ‹ Π΄ΠΎΠ»ΠΆΠ½Π° Π²Ρ‹Π³Π»ΡΠ΄Π΅Ρ‚ΡŒ ΠΊΠ°ΠΊ Π½Π° рисункС 3.

Rydym yn creu llif llif prosesu data. Rhan 2
Ffigur 3. Cynllun y tabl

Cyhoeddi data log defnyddwyr

Mae Pub/Sub yn elfen hanfodol o'n piblinell oherwydd ei fod yn caniatΓ‘u i gymwysiadau annibynnol lluosog gyfathrebu Γ’'i gilydd. Yn benodol, mae'n gweithio fel cyfryngwr sy'n ein galluogi i anfon a derbyn negeseuon rhwng ceisiadau. Y peth cyntaf sydd angen i ni ei wneud yw creu pwnc. Yn syml, ewch i Pub/Sub yn y consol a chliciwch CREATE TOPIC.

Mae'r cod isod yn galw ein sgript i gynhyrchu'r data log a ddiffinnir uchod ac yna'n cysylltu ac yn anfon y logiau i Pub / Sub. Yr unig beth sydd angen i ni ei wneud yw creu gwrthrych PublisherClient, nodwch y llwybr i'r pwnc gan ddefnyddio'r dull topic_path ΠΈ Π²Ρ‹Π·Π²Π°Ρ‚ΡŒ Ρ„ΡƒΠ½ΠΊΡ†ΠΈΡŽ publish с topic_path a data. Sylwch ein bod yn mewnforio generate_log_line o'n sgript stream_logs, felly gwnewch yn siΕ΅r bod y ffeiliau hyn yn yr un ffolder, fel arall fe gewch wall mewnforio. Yna gallwn redeg hwn trwy ein consol google gan ddefnyddio:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Cyn gynted ag y bydd y ffeil yn rhedeg, byddwn yn gallu gweld allbwn y data log i'r consol, fel y dangosir yn y ffigur isod. Bydd y sgript hon yn gweithio cyn belled nad ydym yn ei defnyddio CTRL + Ci'w gwblhau.

Rydym yn creu llif llif prosesu data. Rhan 2
Ffigur 4. Allbwn publish_logs.py

Ysgrifennu ein cod piblinell

Nawr bod gennym bopeth wedi'i baratoi, gallwn ddechrau'r rhan hwyliog - codio ein piblinell gan ddefnyddio Beam a Python. I greu piblinell Beam, mae angen i ni greu gwrthrych piblinell (p). Unwaith y byddwn wedi creu gwrthrych piblinell, gallwn gymhwyso swyddogaethau lluosog un ar Γ΄l y llall gan ddefnyddio'r gweithredwr pipe (|). Yn gyffredinol, mae'r llif gwaith yn edrych fel y ddelwedd isod.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Yn ein cod, byddwn yn creu dwy swyddogaeth arferiad. Swyddogaeth regex_clean, sy'n sganio'r data ac yn adfer y rhes gyfatebol yn seiliedig ar y rhestr PATTERNS gan ddefnyddio'r swyddogaeth re.search. Mae'r ffwythiant yn dychwelyd llinyn wedi'i wahanu gan goma. Os nad ydych chi'n arbenigwr mynegiant rheolaidd, rwy'n argymell gwirio hyn tiwtorial ac ymarferwch mewn llyfr nodiadau i wirio'r cod. Ar Γ΄l hyn rydym yn diffinio swyddogaeth ParDo arferol o'r enw Hollti, sy'n amrywiad o'r trawsnewid Beam ar gyfer prosesu cyfochrog. Yn Python, gwneir hyn mewn ffordd arbennig - rhaid creu dosbarth sy'n etifeddu o ddosbarth DoFn Beam. Mae'r ffwythiant Hollti yn cymryd y rhes wedi'i dosrannu o'r ffwythiant blaenorol ac yn dychwelyd rhestr o eiriaduron gydag allweddi sy'n cyfateb i enwau'r colofnau yn ein tabl BigQuery. Mae rhywbeth i'w nodi am y swyddogaeth hon: roedd yn rhaid i mi fewnforio datetime Π²Π½ΡƒΡ‚Ρ€ΠΈ Ρ„ΡƒΠ½ΠΊΡ†ΠΈΠΈ, Ρ‡Ρ‚ΠΎΠ±Ρ‹ ΠΎΠ½Π° Ρ€Π°Π±ΠΎΡ‚Π°Π»Π°. Π― ΠΏΠΎΠ»ΡƒΡ‡Π°Π» сообщСниС ΠΎΠ± ошибкС ΠΏΡ€ΠΈ ΠΈΠΌΠΏΠΎΡ€Ρ‚Π΅ Π² Π½Π°Ρ‡Π°Π»Π΅ Ρ„Π°ΠΉΠ»Π°, Ρ‡Ρ‚ΠΎ Π±Ρ‹Π»ΠΎ странно. Π­Ρ‚ΠΎΡ‚ список Π·Π°Ρ‚Π΅ΠΌ пСрСдаСтся Π² Ρ„ΡƒΠ½ΠΊΡ†ΠΈΡŽ WriteToBigQuery, sydd yn syml yn ychwanegu ein data at y tabl. Rhoddir y cod ar gyfer Swp DataFlow Job a Streaming DataFlow Job isod. Yr unig wahaniaeth rhwng swp a chod ffrydio yw ein bod yn darllen y CSV ohono mewn swp src_pathdefnyddio'r swyddogaeth ReadFromText o Beam.

Batch DataFlow Job (ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ° ΠΏΠ°ΠΊΠ΅Ρ‚ΠΎΠ²)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Ffrydio DataFlow Job (prosesu ffrwd)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Cychwyn y cludwr

Gallwn redeg y biblinell mewn sawl ffordd wahanol. Pe baem yn dymuno, gallem ei redeg yn lleol o derfynell wrth fewngofnodi i GCP o bell.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Fodd bynnag, rydyn ni'n mynd i'w redeg gan ddefnyddio DataFlow. Gallwn wneud hyn gan ddefnyddio'r gorchymyn isod trwy osod y paramedrau gofynnol canlynol.

  • project β€” ID eich prosiect GCP.
  • runner yn rhedwr piblinell a fydd yn dadansoddi eich rhaglen ac yn adeiladu eich piblinell. I redeg yn y cwmwl, rhaid i chi nodi DataflowRunner.
  • staging_location - y llwybr i storfa cwmwl Cloud Dataflow ar gyfer mynegeio pecynnau cod sydd eu hangen ar y proseswyr sy'n cyflawni'r gwaith.
  • temp_location - llwybr i storfa cwmwl Cloud Dataflow ar gyfer storio ffeiliau swyddi dros dro a grΓ«wyd tra bod y biblinell yn rhedeg.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Tra bod y gorchymyn hwn yn rhedeg, gallwn fynd i'r tab DataFlow yn y consol google a gweld ein piblinell. Pan fyddwn yn clicio ar y biblinell, dylem weld rhywbeth tebyg i Ffigur 4. At ddibenion dadfygio, gall fod yn ddefnyddiol iawn mynd i Logs ac yna i Stackdriver i weld y logiau manwl. Mae hyn wedi fy helpu i ddatrys problemau piblinellau mewn nifer o achosion.

Rydym yn creu llif llif prosesu data. Rhan 2
Ffigur 4: Cludwr trawst

Cyrchwch ein data yn BigQuery

Felly, dylai fod gennym eisoes biblinell yn rhedeg gyda data yn llifo i'n tabl. I brofi hyn, gallwn fynd i BigQuery ac edrych ar y data. Ar Γ΄l defnyddio'r gorchymyn isod dylech weld yr ychydig resi cyntaf o'r set ddata. Nawr bod gennym y data wedi'i storio yn BigQuery, gallwn gynnal dadansoddiad pellach, yn ogystal Γ’ rhannu'r data gyda chydweithwyr a dechrau ateb cwestiynau busnes.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Rydym yn creu llif llif prosesu data. Rhan 2
Ffigur 5: BigQuery

Casgliad

НадССмся, Ρ‡Ρ‚ΠΎ этот пост послуТит ΠΏΠΎΠ»Π΅Π·Π½Ρ‹ΠΌ ΠΏΡ€ΠΈΠΌΠ΅Ρ€ΠΎΠΌ создания ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ²ΠΎΠ³ΠΎ ΠΊΠΎΠ½Π²Π΅ΠΉΠ΅Ρ€Π° Π΄Π°Π½Π½Ρ‹Ρ…, Π° Ρ‚Π°ΠΊΠΆΠ΅ поиска способов ΡΠ΄Π΅Π»Π°Ρ‚ΡŒ Π΄Π°Π½Π½Ρ‹Π΅ Π±ΠΎΠ»Π΅Π΅ доступными. Π₯Ρ€Π°Π½Π΅Π½ΠΈΠ΅ Π΄Π°Π½Π½Ρ‹Ρ… Π² Ρ‚Π°ΠΊΠΎΠΌ Ρ„ΠΎΡ€ΠΌΠ°Ρ‚Π΅ Π΄Π°Π΅Ρ‚ Π½Π°ΠΌ ΠΌΠ½ΠΎΠ³ΠΎ прСимущСств. Π’Π΅ΠΏΠ΅Ρ€ΡŒ ΠΌΡ‹ ΠΌΠΎΠΆΠ΅ΠΌ Π½Π°Ρ‡Π°Ρ‚ΡŒ ΠΎΡ‚Π²Π΅Ρ‡Π°Ρ‚ΡŒ Π½Π° Π²Π°ΠΆΠ½Ρ‹Π΅ вопросы, Π½Π°ΠΏΡ€ΠΈΠΌΠ΅Ρ€, сколько людСй ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΡŽΡ‚ наш ΠΏΡ€ΠΎΠ΄ΡƒΠΊΡ‚? РастСт Π»ΠΈ со Π²Ρ€Π΅ΠΌΠ΅Π½Π΅ΠΌ Π±Π°Π·Π° ΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚Π΅Π»Π΅ΠΉ? Π‘ ΠΊΠ°ΠΊΠΈΠΌΠΈ аспСктами ΠΏΡ€ΠΎΠ΄ΡƒΠΊΡ‚Π° люди Π²Π·Π°ΠΈΠΌΠΎΠ΄Π΅ΠΉΡΡ‚Π²ΡƒΡŽΡ‚ большС всСго? И Π΅ΡΡ‚ΡŒ Π»ΠΈ ошибки, Ρ‚Π°ΠΌ Π³Π΄Π΅ ΠΈΡ… Π±Ρ‹Ρ‚ΡŒ Π½Π΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ? Π­Ρ‚ΠΎ Ρ‚Π΅ вопросы, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Π±ΡƒΠ΄ΡƒΡ‚ интСрСсны для ΠΎΡ€Π³Π°Π½ΠΈΠ·Π°Ρ†ΠΈΠΈ. На основС ΠΈΠ΄Π΅ΠΉ, Π²Ρ‹Ρ‚Π΅ΠΊΠ°ΡŽΡ‰ΠΈΡ… ΠΈΠ· ΠΎΡ‚Π²Π΅Ρ‚ΠΎΠ² Π½Π° эти вопросы, ΠΌΡ‹ смоТСм ΡƒΡΠΎΠ²Π΅Ρ€ΡˆΠ΅Π½ΡΡ‚Π²ΠΎΠ²Π°Ρ‚ΡŒ ΠΏΡ€ΠΎΠ΄ΡƒΠΊΡ‚ ΠΈ ΠΏΠΎΠ²Ρ‹ΡΠΈΡ‚ΡŒ Π·Π°ΠΈΠ½Ρ‚Π΅Ρ€Π΅ΡΠΎΠ²Π°Π½Π½ΠΎΡΡ‚ΡŒ ΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚Π΅Π»Π΅ΠΉ.

Mae Beam yn ddefnyddiol iawn ar gyfer y math hwn o ymarfer corff ac mae ganddo nifer o achosion defnydd diddorol eraill hefyd. Er enghraifft, efallai y byddwch am ddadansoddi data ticio stoc mewn amser real a gwneud crefftau yn seiliedig ar y dadansoddiad, efallai bod gennych ddata synhwyrydd yn dod o gerbydau ac eisiau cyfrifo cyfrifiadau lefel traffig. Gallech hefyd, er enghraifft, fod yn gwmni hapchwarae sy'n casglu data defnyddwyr ac yn ei ddefnyddio i greu dangosfyrddau i olrhain metrigau allweddol. Iawn, foneddigion, mae hwn yn bwnc ar gyfer post arall, diolch am ddarllen, ac i'r rhai sydd am weld y cod llawn, isod mae'r ddolen i fy GitHub.

https://github.com/DFoly/User_log_pipeline

Dyna i gyd. Darllenwch ran un.

Ffynhonnell: hab.com

Ychwanegu sylw