Մենք ստեղծում ենք հոսքային տվյալների մշակման խողովակաշար: Մաս 2

Բարեւ բոլորին. Կիսվում ենք հոդվածի վերջին մասի թարգմանությամբ՝ պատրաստված հատուկ դասընթացի ուսանողների համար։ Տվյալների ինժեներ. Կարող եք կարդալ առաջին մասը այստեղ.

Apache Beam և DataFlow իրական ժամանակի խողովակաշարերի համար

Մենք ստեղծում ենք հոսքային տվյալների մշակման խողովակաշար: Մաս 2

Google Cloud-ի կարգավորում

Նշում. ես օգտագործել եմ Google Cloud Shell-ը խողովակաշարը գործարկելու և հատուկ տեղեկամատյանների տվյալները հրապարակելու համար, քանի որ խնդիրներ էի ունենում Python 3-ում խողովակաշարը վարելու համար: Google Cloud Shell-ն օգտագործում է Python 2, որն ավելի համահունչ է Apache Beam-ին:

Խողովակաշարը սկսելու համար մենք պետք է մի փոքր փորփրենք պարամետրերը: Ձեզանից նրանց համար, ովքեր նախկինում չեն օգտագործել GCP-ն, դուք պետք է հետևեք հետևյալ 6 քայլերին, որոնք նկարագրված են դրանում էջ.

Դրանից հետո մենք պետք է վերբեռնենք մեր սկրիպտները Google Cloud Storage-ում և պատճենենք դրանք մեր Google Cloud Shel-ում: Ամպային պահեստում վերբեռնումը բավականին աննշան է (կարելի է գտնել նկարագրությունը այստեղ) Մեր ֆայլերը պատճենելու համար մենք կարող ենք բացել Google Cloud Shel-ը գործիքագոտուց՝ սեղմելով ստորև նկար 2-ի ձախ կողմում գտնվող առաջին պատկերակը:

Մենք ստեղծում ենք հոսքային տվյալների մշակման խողովակաշար: Մաս 2
Նկար 2

Ֆայլերը պատճենելու և անհրաժեշտ գրադարանները տեղադրելու համար մեզ անհրաժեշտ հրամանները ներկայացված են ստորև:

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Մեր տվյալների բազայի և աղյուսակի ստեղծում

Երբ մենք ավարտենք կարգավորումների հետ կապված բոլոր քայլերը, հաջորդ բանը, որ մենք պետք է անենք, BigQuery-ում տվյալների բազա և աղյուսակ ստեղծելն է: Դա անելու մի քանի եղանակ կա, բայց ամենապարզը Google Cloud կոնսոլից օգտվելն է՝ նախ ստեղծելով տվյալների բազա: Դուք կարող եք հետևել ստորև նշված քայլերին ՈՒղեցույցսխեմայով աղյուսակ ստեղծելու համար: Մեր սեղանը կունենա 7 սյունակ, որը համապատասխանում է յուրաքանչյուր օգտագործողի գրանցամատյանի բաղադրիչներին: Հարմարության համար մենք բոլոր սյունակները կսահմանենք որպես տողեր, բացառությամբ timelocal փոփոխականի, և դրանք կանվանենք ըստ նախկինում ստեղծած փոփոխականների։ Մեր աղյուսակի դասավորությունը պետք է նման լինի Նկար 3-ում:

Մենք ստեղծում ենք հոսքային տվյալների մշակման խողովակաշար: Մաս 2
Նկար 3. Սեղանի դասավորությունը

Օգտագործողի մատյանների տվյալների հրապարակում

Pub/Sub-ը մեր խողովակաշարի կարևոր բաղադրիչն է, քանի որ այն թույլ է տալիս մի քանի անկախ հավելվածների հաղորդակցվել միմյանց հետ: Մասնավորապես, այն աշխատում է որպես միջնորդ, որը թույլ է տալիս մեզ ուղարկել և ստանալ հաղորդագրություններ հավելվածների միջև: Առաջին բանը, որ մենք պետք է անենք, թեմա ստեղծելն է: Պարզապես գնացեք Pub/Sub կոնսոլում և սեղմեք «ՍՏԵՂԾԵԼ ԹԵՄԱ»:

Ստորև բերված կոդը կանչում է մեր սկրիպտը՝ ստեղծելու վերևում սահմանված մատյան տվյալները, այնուհետև միացնում և ուղարկում է տեղեկամատյանները Pub/Sub-ին: Միակ բանը, որ մենք պետք է անենք, օբյեկտ ստեղծելն է PublisherClient, մեթոդով նշեք թեմայի ուղին topic_path և զանգահարեք ֆունկցիան publish с topic_path և տվյալներ։ Խնդրում ենք նկատի ունենալ, որ մենք ներմուծում ենք generate_log_line մեր սցենարից stream_logs, այնպես որ համոզվեք, որ այս ֆայլերը գտնվում են նույն թղթապանակում, հակառակ դեպքում դուք ներմուծման սխալ կստանաք: Այնուհետև մենք կարող ենք սա գործարկել մեր Google վահանակի միջոցով՝ օգտագործելով.

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Հենց որ ֆայլը գործարկվի, մենք կկարողանանք տեսնել տեղեկամատյանների տվյալների ելքը դեպի վահանակ, ինչպես ցույց է տրված ստորև նկարում: Այս սցենարը կաշխատի այնքան ժամանակ, քանի դեռ մենք չենք օգտագործում CTRL + Cայն ավարտելու համար:

Մենք ստեղծում ենք հոսքային տվյալների մշակման խողովակաշար: Մաս 2
Նկար 4. Արդյունք publish_logs.py

Գրելով մեր խողովակաշարի կոդը

Այժմ, երբ մենք ամեն ինչ պատրաստ ենք, կարող ենք սկսել զվարճալի մասը՝ մեր խողովակաշարի կոդավորումը Beam-ի և Python-ի միջոցով: Beam խողովակաշար ստեղծելու համար մենք պետք է ստեղծենք խողովակաշարի օբյեկտ (p): Երբ մենք ստեղծենք խողովակաշարի օբյեկտ, մենք կարող ենք կիրառել բազմաթիվ գործառույթներ մեկը մյուսի հետևից՝ օգտագործելով օպերատորը pipe (|). Ընդհանուր առմամբ, աշխատանքային հոսքը նման է ստորև ներկայացված պատկերին:

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Մեր կոդում մենք կստեղծենք երկու հատուկ գործառույթ: Գործառույթ regex_clean, որը սկանավորում է տվյալները և առբերում համապատասխան տողը PATTERNS ցուցակի հիման վրա՝ օգտագործելով ֆունկցիան re.search. Ֆունկցիան վերադարձնում է ստորակետերով առանձնացված տող: Եթե ​​դուք սովորական արտահայտման մասնագետ չեք, խորհուրդ եմ տալիս ստուգել սա ուսուցողական և զբաղվեք նոթատետրում՝ կոդը ստուգելու համար: Դրանից հետո մենք սահմանում ենք մաքսային ParDo ֆունկցիա, որը կոչվում է պառակտում, որը Beam փոխակերպման տարբերակ է զուգահեռ մշակման համար։ Python-ում դա արվում է հատուկ ձևով՝ մենք պետք է ստեղծենք դաս, որը ժառանգում է DoFn Beam դասից։ Split ֆունկցիան վերցնում է վերլուծված տողը նախորդ ֆունկցիայից և վերադարձնում է բառարանների ցանկ մեր BigQuery աղյուսակի սյունակների անուններին համապատասխանող ստեղներով: Այս ֆունկցիայի հետ կապված մի բան կա նշելու. ես ստիպված էի ներմուծել datetime ֆունկցիայի ներսում, որպեսզի այն աշխատի: Ես ֆայլի սկզբում ներմուծման սխալ էի ստանում, ինչը տարօրինակ էր: Այս ցուցակն այնուհետև փոխանցվում է ֆունկցիային WriteToBigQuery, որը պարզապես ավելացնում է մեր տվյալները աղյուսակում: Batch DataFlow Job-ի և Streaming DataFlow Job-ի կոդը տրված է ստորև: Փաթեթի և հոսքային կոդի միջև միակ տարբերությունն այն է, որ խմբաքանակում մենք կարդում ենք CSV-ն src_pathօգտագործելով գործառույթը ReadFromText Beam-ից:

Batch DataFlow Job (խմբաքանակի մշակում)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (հոսքի մշակում)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Փոխակրիչի գործարկում

Մենք կարող ենք խողովակաշարը վարել մի քանի տարբեր ձևերով: Եթե ​​ցանկանայինք, մենք պարզապես կարող էինք այն գործարկել լոկալ տերմինալից՝ հեռակա կարգով GCP մուտք գործելիս:

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Այնուամենայնիվ, մենք պատրաստվում ենք այն գործարկել DataFlow-ի միջոցով: Մենք կարող ենք դա անել՝ օգտագործելով ստորև նշված հրամանը՝ սահմանելով հետևյալ պահանջվող պարամետրերը:

  • project — Ձեր GCP նախագծի ID-ն:
  • runner խողովակաշար է, որը կվերլուծի ձեր ծրագիրը և կկառուցի ձեր խողովակաշարը: Ամպում աշխատելու համար դուք պետք է նշեք DataflowRunner:
  • staging_location — ճանապարհը դեպի Cloud Dataflow ամպային պահեստ՝ աշխատանքը կատարող պրոցեսորների կողմից անհրաժեշտ կոդային փաթեթների ինդեքսավորման համար:
  • temp_location — ճանապարհ դեպի Cloud Dataflow ամպային պահեստ՝ խողովակաշարի գործարկման ընթացքում ստեղծված աշխատանքային ժամանակավոր ֆայլերը պահելու համար:
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Մինչ այս հրամանն աշխատում է, մենք կարող ենք գնալ google վահանակի DataFlow ներդիր և դիտել մեր խողովակաշարը: Երբ մենք սեղմում ենք խողովակաշարի վրա, մենք պետք է տեսնենք մի բան, որը նման է Նկար 4-ին: Վրիպազերծման նպատակների համար կարող է շատ օգտակար լինել գնալ Logs, այնուհետև Stackdriver՝ մանրամասն տեղեկամատյանները դիտելու համար: Սա օգնել է ինձ լուծել խողովակաշարի խնդիրները մի շարք դեպքերում:

Մենք ստեղծում ենք հոսքային տվյալների մշակման խողովակաշար: Մաս 2
Նկար 4. Ճառագայթային փոխակրիչ

Մուտք գործեք մեր տվյալները BigQuery-ում

Այսպիսով, մենք արդեն պետք է ունենանք խողովակաշար, որն աշխատում է մեր աղյուսակի մեջ հոսող տվյալների հետ: Սա փորձարկելու համար մենք կարող ենք գնալ BigQuery և նայել տվյալները: Ստորև նշված հրամանն օգտագործելուց հետո դուք պետք է տեսնեք տվյալների հավաքածուի առաջին մի քանի տողերը: Այժմ, երբ մենք ունենք BigQuery-ում պահվող տվյալները, մենք կարող ենք հետագա վերլուծություններ կատարել, ինչպես նաև կիսվել տվյալները գործընկերների հետ և սկսել պատասխանել բիզնեսի հարցերին:

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Մենք ստեղծում ենք հոսքային տվյալների մշակման խողովակաշար: Մաս 2
Նկար 5. BigQuery

Ամփոփում

Հուսով ենք, որ այս գրառումը օգտակար օրինակ կծառայի հոսքային տվյալների խողովակաշար ստեղծելու, ինչպես նաև տվյալների ավելի մատչելի դարձնելու ուղիներ գտնելու համար: Այս ձևաչափով տվյալների պահպանումը մեզ շատ առավելություններ է տալիս: Այժմ մենք կարող ենք սկսել պատասխանել այնպիսի կարևոր հարցերի, ինչպիսիք են, թե քանի մարդ է օգտվում մեր արտադրանքից: Ձեր օգտատերերի բազան ժամանակի ընթացքում աճու՞մ է: Ապրանքի ո՞ր ասպեկտների հետ են մարդիկ ամենաշատը շփվում: Եվ կա՞ն սխալներ, որտեղ չպետք է լինեն: Սրանք այն հարցերն են, որոնք կհետաքրքրեն կազմակերպությանը։ Հիմնվելով այս հարցերի պատասխաններից ստացված պատկերացումների վրա՝ մենք կարող ենք բարելավել արտադրանքը և ավելացնել օգտատերերի ներգրավվածությունը:

Beam-ը իսկապես օգտակար է այս տեսակի վարժությունների համար և ունի նաև մի շարք այլ հետաքրքիր օգտագործման դեպքեր: Օրինակ, դուք կարող եք վերլուծել բաժնետոմսերի տիզերի տվյալները իրական ժամանակում և գործարքներ կատարել վերլուծության հիման վրա, գուցե դուք ունեք սենսորային տվյալներ, որոնք գալիս են տրանսպորտային միջոցներից և ցանկանում եք հաշվարկել երթևեկության մակարդակի հաշվարկները: Դուք կարող եք նաև լինել, օրինակ, խաղային ընկերություն, որը հավաքում է օգտատերերի տվյալները և օգտագործում դրանք՝ ստեղծելու վահանակներ՝ հիմնական չափումները հետևելու համար: Լավ, պարոնայք, սա հերթական գրառման թեմա է, շնորհակալություն կարդալու համար, իսկ նրանց համար, ովքեր ցանկանում են տեսնել ամբողջական կոդը, ստորև ներկայացնում ենք իմ GitHub-ի հղումը:

https://github.com/DFoly/User_log_pipeline

Դա բոլորն են: Կարդացեք առաջին մասը.

Source: www.habr.com

Добавить комментарий