Ne krijojmë një tubacion për përpunimin e të dhënave të rrjedhës. Pjesa 2

Pershendetje te gjitheve. Po ndajmë përkthimin e pjesës së fundit të artikullit, të përgatitur posaçërisht për studentët e kursit. "Inxhinier i të dhënave". Ju mund të lexoni pjesën e parë këtu.

Apache Beam dhe Data Flow për tubacionet në kohë reale

Ne krijojmë një tubacion për përpunimin e të dhënave të rrjedhës. Pjesa 2

Konfigurimi i Google Cloud

Shënim: Kam përdorur Google Cloud Shell për të ekzekutuar gazsjellësin dhe për të publikuar të dhënat e regjistrit të personalizuar sepse kisha probleme me drejtimin e tubacionit në Python 3. Google Cloud Shell përdor Python 2, i cili është më në përputhje me Apache Beam.

Për të filluar tubacionin, duhet të gërmojmë pak në cilësimet. Për ata prej jush që nuk e kanë përdorur GCP më parë, do t'ju duhet të ndiqni 6 hapat e mëposhtëm të përshkruar në këtë faqe.

Pas kësaj, do të na duhet të ngarkojmë skriptet tona në Google Cloud Storage dhe t'i kopjojmë ato në Google Cloud Shel. Ngarkimi në ruajtjen e cloud është mjaft i parëndësishëm (mund të gjendet një përshkrim këtu). Për të kopjuar skedarët tanë, ne mund të hapim Google Cloud Shel nga shiriti i veglave duke klikuar ikonën e parë në të majtë në Figurën 2 më poshtë.

Ne krijojmë një tubacion për përpunimin e të dhënave të rrjedhës. Pjesa 2
Figura 2

Komandat që na duhen për të kopjuar skedarët dhe për të instaluar bibliotekat e kërkuara janë renditur më poshtë.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Krijimi i bazës së të dhënave dhe tabelës sonë

Pasi të kemi përfunduar të gjitha hapat që lidhen me konfigurimin, gjëja tjetër që duhet të bëjmë është të krijojmë një grup të dhënash dhe tabelë në BigQuery. Ka disa mënyra për ta bërë këtë, por më e thjeshta është të përdorni tastierën Google Cloud duke krijuar fillimisht një grup të dhënash. Ju mund të ndiqni hapat e mëposhtëm lidhjepër të krijuar një tabelë me një skemë. Tabela jonë do të ketë 7 kolona, që korrespondon me komponentët e çdo regjistri të përdoruesit. Për lehtësi, ne do t'i përcaktojmë të gjitha kolonat si vargje, përveç ndryshores timelocal, dhe do t'i emërtojmë ato sipas variablave që kemi krijuar më parë. Paraqitja e tabelës sonë duhet të duket si në Figurën 3.

Ne krijojmë një tubacion për përpunimin e të dhënave të rrjedhës. Pjesa 2
Figura 3. Paraqitja e tabelës

Publikimi i të dhënave të regjistrit të përdoruesit

Pub/Sub është një komponent kritik i tubacionit tonë sepse lejon shumë aplikacione të pavarura të komunikojnë me njëri-tjetrin. Në veçanti, ai funksionon si një ndërmjetës që na lejon të dërgojmë dhe marrim mesazhe midis aplikacioneve. Gjëja e parë që duhet të bëjmë është të krijojmë një temë. Thjesht shkoni te Pub/Sub në tastierë dhe klikoni CREATE TOPIC.

Kodi më poshtë thërret skenarin tonë për të gjeneruar të dhënat e log -it të përcaktuara më lart dhe më pas lidh dhe dërgon shkrimet në PUB/SUB. E vetmja gjë që duhet të bëjmë është të krijojmë një objekt PublisherClient, specifikoni rrugën drejt temës duke përdorur metodën topic_path dhe thirrni funksionin publish с topic_path dhe të dhëna. Ju lutemi vini re se ne importojmë generate_log_line nga skenari ynë stream_logs, prandaj sigurohuni që këta skedarë të jenë në të njëjtën dosje, përndryshe do të merrni një gabim importi. Më pas mund ta ekzekutojmë këtë përmes tastierës sonë të Google duke përdorur:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Sapo skedari të ekzekutohet, ne do të jemi në gjendje të shohim daljen e të dhënave të regjistrit në tastierë, siç tregohet në figurën më poshtë. Ky skenar do të funksionojë për sa kohë që ne nuk e përdorim CTRL + Cpër ta përfunduar atë.

Ne krijojmë një tubacion për përpunimin e të dhënave të rrjedhës. Pjesa 2
Figura 4. Prodhimi publish_logs.py

Shkrimi i kodit tonë të tubacionit

Tani që kemi gjithçka të përgatitur, mund të fillojmë pjesën argëtuese - kodimin e tubacionit tonë duke përdorur Beam dhe Python. Për të krijuar një tubacion Beam, duhet të krijojmë një objekt tubacioni (p). Pasi të kemi krijuar një objekt tubacioni, ne mund të aplikojmë funksione të shumta njëri pas tjetrit duke përdorur operatorin pipe (|). Në përgjithësi, rrjedha e punës duket si imazhi më poshtë.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Në kodin tonë, ne do të krijojmë dy funksione të personalizuara. Funksioni regex_clean, i cili skanon të dhënat dhe merr rreshtin përkatës bazuar në listën PATTERNS duke përdorur funksionin re.search. Funksioni kthen një varg të ndarë me presje. Nëse nuk jeni ekspert i shprehjes së rregullt, ju rekomandoj ta kontrolloni këtë tutorial dhe praktikoni në një bllok shënimesh për të kontrolluar kodin. Pas kësaj ne përcaktojmë një funksion të personalizuar ParDo të quajtur ndarje, i cili është një variant i transformimit të rrezes për përpunim paralel. Në Python, kjo bëhet në një mënyrë të veçantë - ne duhet të krijojmë një klasë që trashëgon nga klasa DoFn Beam. Funksioni Split merr rreshtin e analizuar nga funksioni i mëparshëm dhe kthen një listë fjalorësh me çelësa që korrespondojnë me emrat e kolonave në tabelën tonë BigQuery. Ka diçka për t'u shënuar në lidhje me këtë funksion: më duhej të importoja datetime brenda një funksioni për ta bërë atë të funksionojë. Po merrja një gabim importi në fillim të skedarit, i cili ishte i çuditshëm. Kjo listë më pas i kalohet funksionit WriteToBigQuery, e cila thjesht shton të dhënat tona në tabelë. Kodi për punë Batch DataFlow dhe Transmeting DataFlow Job është dhënë më poshtë. Dallimi i vetëm midis kodit të grupit dhe atij të transmetimit është se në grup lexojmë CSV-në nga src_pathduke përdorur funksionin ReadFromText nga Rrezja.

Puna e rrjedhës së të dhënave në grup (përpunim grupor)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Transmetimi i punës së rrjedhës së të dhënave (përpunimi i transmetimit)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Nisja e transportuesit

Ne mund ta drejtojmë tubacionin në disa mënyra të ndryshme. Nëse dëshironim, ne thjesht mund ta ekzekutonim atë në nivel lokal nga një terminal ndërsa identifikoheshim në GCP nga distanca.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Sidoqoftë, ne do ta ekzekutojmë atë duke përdorur DataFlow. Ne mund ta bëjmë këtë duke përdorur komandën e mëposhtme duke vendosur parametrat e mëposhtëm të kërkuar.

  • project — ID-ja e projektit tuaj GCP.
  • runner është një drejtues tubacioni që do të analizojë programin tuaj dhe do të ndërtojë tubacionin tuaj. Për të ekzekutuar në cloud, duhet të specifikoni një DataflowRunner.
  • staging_location — shtegu drejt ruajtjes së resë së të dhënave të Cloud për indeksimin e paketave të kodeve të nevojshme nga përpunuesit që kryejnë punën.
  • temp_location — shtegu për në hapësirën ruajtëse të Cloud Dataflow për ruajtjen e skedarëve të përkohshëm të punës të krijuara ndërsa gazsjellësi po funksionon.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Ndërsa kjo komandë po ekzekutohet, ne mund të shkojmë te skeda DataFlow në tastierën e Google dhe të shikojmë tubacionin tonë. Kur klikojmë në tubacion, duhet të shohim diçka të ngjashme me Figurën 4. Për qëllime korrigjimi, mund të jetë shumë e dobishme të shkojmë te Logs dhe më pas te Stackdriver për të parë regjistrat e detajuar. Kjo më ka ndihmuar të zgjidh çështjet e tubacionit në një numër rastesh.

Ne krijojmë një tubacion për përpunimin e të dhënave të rrjedhës. Pjesa 2
Figura 4: Transportues me rreze

Hyni në të dhënat tona në BigQuery

Pra, ne duhet të kemi tashmë një tubacion që funksionon me të dhëna që rrjedhin në tabelën tonë. Për ta testuar këtë, ne mund të shkojmë në BigQuery dhe të shikojmë të dhënat. Pas përdorimit të komandës më poshtë, duhet të shihni rreshtat e parë të grupit të të dhënave. Tani që kemi të dhënat e ruajtura në BigQuery, mund të bëjmë analiza të mëtejshme, si dhe të ndajmë të dhënat me kolegët dhe të fillojmë t'u përgjigjemi pyetjeve të biznesit.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Ne krijojmë një tubacion për përpunimin e të dhënave të rrjedhës. Pjesa 2
Figura 5: BigQuery

Përfundim

Shpresojmë që ky postim të shërbejë si një shembull i dobishëm për krijimin e një tubacioni të të dhënave të transmetimit, si dhe gjetjen e mënyrave për t'i bërë të dhënat më të aksesueshme. Ruajtja e të dhënave në këtë format na jep shumë përparësi. Tani mund të fillojmë t'u përgjigjemi pyetjeve të rëndësishme si p.sh. sa njerëz e përdorin produktin tonë? A po rritet baza juaj e përdoruesve me kalimin e kohës? Me cilat aspekte të produktit ndërveprojnë më shumë njerëzit? Dhe a ka gabime aty ku nuk duhet të ketë? Këto janë pyetjet që do të jenë me interes për organizatën. Bazuar në njohuritë që dalin nga përgjigjet e këtyre pyetjeve, ne mund të përmirësojmë produktin dhe të rrisim angazhimin e përdoruesve.

Beam është vërtet i dobishëm për këtë lloj ushtrimi dhe gjithashtu ka një numër rastesh të tjera interesante përdorimi. Për shembull, mund të dëshironi të analizoni të dhënat e rriqrave të aksioneve në kohë reale dhe të bëni tregti bazuar në analizat, ndoshta keni të dhëna sensori që vijnë nga automjetet dhe dëshironi të llogaritni llogaritjet e nivelit të trafikut. Ju gjithashtu, për shembull, mund të jeni një kompani lojrash që mbledh të dhënat e përdoruesve dhe i përdor ato për të krijuar panele kontrolli për të gjurmuar metrikat kryesore. Mirë, zotërinj, kjo është një temë për një postim tjetër, faleminderit për lexim, dhe për ata që duan të shohin kodin e plotë, më poshtë është lidhja për në GitHub tim.

https://github.com/DFoly/User_log_pipeline

Kjo eshte e gjitha. Lexoni pjesën e parë.

Burimi: www.habr.com

Shto një koment