Loome voo andmetöötluskonveieri. 2. osa

Tere kõigile. Jagame artikli viimase osa tõlget, mis on koostatud spetsiaalselt kursuse üliõpilastele. Andmeinsener. Saate lugeda esimest osa siin.

Apache Beam ja DataFlow reaalajas torujuhtmete jaoks

Loome voo andmetöötluskonveieri. 2. osa

Google Cloudi seadistamine

Märkus. Kasutasin konveieri käitamiseks ja kohandatud logiandmete avaldamiseks Google Cloud Shelli, kuna mul oli probleeme konveieri käitamisega Python 3-s. Google Cloud Shell kasutab Python 2, mis on Apache Beamiga paremini kooskõlas.

Torujuhtme käivitamiseks peame veidi seadetesse süvenema. Need, kes pole varem GCP-d kasutanud, peate järgima järgmisi 6 selles kirjeldatud sammu lehekülg.

Pärast seda peame oma skriptid üles laadima teenusesse Google Cloud Storage ja kopeerima need oma Google Cloud Sheli. Pilvesalvestusse üleslaadimine on üsna triviaalne (kirjelduse leiate siin). Failide kopeerimiseks saame avada Google Cloud Sheli tööriistaribalt, klõpsates alloleval joonisel 2 vasakul esimesel ikoonil.

Loome voo andmetöötluskonveieri. 2. osa
Joonis 2

Käsud, mida vajame failide kopeerimiseks ja vajalike teekide installimiseks, on loetletud allpool.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Meie andmebaasi ja tabeli loomine

Kui oleme kõik seadistamisega seotud toimingud lõpetanud, peame järgmiseks looma BigQuerys andmestiku ja tabeli. Selleks on mitu võimalust, kuid kõige lihtsam on kasutada Google Cloudi konsooli, luues esmalt andmestiku. Saate järgida allolevaid samme linkskeemiga tabeli loomiseks. Meie lauale jääb 7 veergu, mis vastab iga kasutajalogi komponentidele. Mugavuse huvides määratleme kõik veerud stringidena, välja arvatud ajalokaalne muutuja, ja nimetame need vastavalt varem genereeritud muutujatele. Meie tabeli paigutus peaks välja nägema nagu joonisel 3.

Loome voo andmetöötluskonveieri. 2. osa
Joonis 3. Tabeli paigutus

Kasutaja logiandmete avaldamine

Pub/Sub on meie konveieri kriitiline komponent, kuna see võimaldab mitmel sõltumatul rakendusel üksteisega suhelda. Eelkõige toimib see vahendajana, mis võimaldab meil rakenduste vahel sõnumeid saata ja vastu võtta. Esimese asjana peame looma teema. Lihtsalt avage konsoolis Pub/Sub ja klõpsake nuppu LOO TEEMA.

Allolev kood kutsub üles meie skripti, et genereerida ülalmääratletud logiandmed ning seejärel ühendatakse ja saadavad logid Pub/Sub. Ainus, mida me tegema peame, on luua objekt PublisherClient, määrake meetodi abil teema tee topic_path ja helistage funktsioonile publish с topic_path ja andmed. Pange tähele, et me impordime generate_log_line meie stsenaariumist stream_logs, seega veenduge, et need failid oleksid samas kaustas, vastasel juhul kuvatakse imporditõrge. Seejärel saame seda oma Google'i konsooli kaudu käivitada, kasutades:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Niipea kui fail käivitub, näeme logiandmete väljundit konsooli, nagu on näidatud alloleval joonisel. See skript töötab seni, kuni me seda ei kasuta CTRL + Cselle lõpuleviimiseks.

Loome voo andmetöötluskonveieri. 2. osa
Joonis 4. Väljund publish_logs.py

Meie torujuhtme koodi kirjutamine

Nüüd, kui oleme kõik ette valmistanud, saame alustada lõbusa osaga – oma konveieri kodeerimisega Beami ja Pythoni abil. Beam torujuhtme loomiseks peame looma konveieri objekti (p). Kui oleme konveieriobjekti loonud, saame operaatori abil üksteise järel rakendada mitut funktsiooni pipe (|). Üldiselt näeb töövoog välja nagu alloleval pildil.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Meie koodis loome kaks kohandatud funktsiooni. Funktsioon regex_clean, mis skannib andmed ja toob funktsiooni abil välja vastava rea ​​loendi MUSTRID alusel re.search. Funktsioon tagastab komadega eraldatud stringi. Kui te pole regulaaravaldiste ekspert, soovitan seda vaadata õpetus ja harjutage koodi kontrollimist märkmikus. Pärast seda määratleme kohandatud ParDo funktsiooni nimega lõhe, mis on Beam teisenduse variatsioon paralleelseks töötlemiseks. Pythonis tehakse seda erilisel viisil – peame looma klassi, mis pärib klassist DoFn Beam. Funktsioon Split võtab eelmisest funktsioonist sõelutud rea ja tagastab sõnaraamatute loendi võtmetega, mis vastavad meie BigQuery tabeli veergude nimedele. Selle funktsiooni kohta on midagi märkida: ma pidin importima datetime funktsiooni sees, et see toimiks. Sain faili alguses impordivea, mis oli imelik. Seejärel edastatakse see loend funktsioonile WriteToBigQuery, mis lihtsalt lisab meie andmed tabelisse. Batch DataFlow töö ja voogesituse DataFlow töö kood on toodud allpool. Ainus erinevus partii- ja voogedastuskoodi vahel on see, et partiides loeme CSV-d src_pathfunktsiooni kasutades ReadFromText alates Beamist.

Batch DataFlow töö (partiitöötlus)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Voogesitus DataFlow töö (voo töötlemine)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Konveieri käivitamine

Saame torujuhtme juhtida mitmel erineval viisil. Soovi korral saaksime seda lihtsalt terminalist kohapeal käivitada, logides samal ajal GCP-sse kaugjuhtimisega.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Siiski käivitame selle DataFlow abil. Seda saame teha alloleva käsu abil, määrates järgmised nõutavad parameetrid.

  • project — teie GCP projekti ID.
  • runner on torujuhtme käivitaja, mis analüüsib teie programmi ja koostab teie torujuhtme. Pilves käitamiseks peate määrama DataflowRunneri.
  • staging_location — tee Cloud Dataflow pilvmälu juurde, et indekseerida töid teostavatele protsessoritele vajalikke koodipakette.
  • temp_location — tee pilveandmete voo pilvesalvestusele, et salvestada konveieri töötamise ajal loodud ajutisi tööfaile.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Kui see käsk töötab, saame minna Google'i konsooli vahekaardile DataFlow ja vaadata oma konveierit. Kui klõpsame konveieril, peaksime nägema midagi sarnast nagu joonisel 4. Silumiseks võib olla väga kasulik avada logid ja seejärel Stackdriver, et vaadata üksikasjalikke logisid. See on aidanud mul mitmel juhul torujuhtmega seotud probleeme lahendada.

Loome voo andmetöötluskonveieri. 2. osa
Joonis 4: Tala konveier

Juurdepääs meie andmetele BigQuerys

Seega peaks meil juba olema konveier, mille andmed voolavad meie tabelisse. Selle testimiseks võime minna BigQuerysse ja vaadata andmeid. Pärast alloleva käsu kasutamist peaksite nägema andmestiku paar esimest rida. Nüüd, kui andmed on BigQuerys salvestatud, saame teha täiendavaid analüüse, samuti jagada andmeid kolleegidega ja hakata vastama äriküsimustele.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Loome voo andmetöötluskonveieri. 2. osa
Joonis 5: BigQuery

Järeldus

Loodame, et see postitus on kasulik näide voogesituse andmekanali loomisest ja andmete kättesaadavamaks muutmise viiside leidmisest. Andmete salvestamine selles vormingus annab meile palju eeliseid. Nüüd saame hakata vastama olulistele küsimustele, näiteks kui palju inimesi meie toodet kasutab? Kas teie kasutajaskond kasvab aja jooksul? Milliste toote aspektidega inimesed kõige rohkem suhtlevad? Ja kas on vigu seal, kus ei tohiks olla? Need on küsimused, mis organisatsioonile huvi pakuvad. Nendele küsimustele antud vastustest tulenevate arusaamade põhjal saame toodet täiustada ja suurendada kasutajate seotust.

Beam on seda tüüpi harjutuste jaoks tõesti kasulik ja sellel on ka mitmeid muid huvitavaid kasutusjuhtumeid. Näiteks võib tekkida soov analüüsida aktsiahindade andmeid reaalajas ja teha tehinguid analüüsi põhjal, võib-olla on teil sõidukitelt pärit anduriandmeid ja soovite arvutada liiklustaseme arvutusi. Samuti võite olla näiteks mängufirma, mis kogub kasutajaandmeid ja kasutab neid armatuurlaudade loomiseks põhimõõdikute jälgimiseks. Olgu, härrased, see on teise postituse teema, tänan lugemise eest ja neile, kes soovivad täielikku koodi näha, on allpool link minu GitHubi juurde.

https://github.com/DFoly/User_log_pipeline

See on kõik. Lugege esimest osa.

Allikas: www.habr.com

Lisa kommentaar