Mēs izveidojam straumes datu apstrādes cauruļvadu. 2. daļa

Sveiki visiem. Dalāmies ar raksta beigu daļas tulkojumu, kas sagatavots īpaši kursa studentiem. Datu inženieris. Jūs varat izlasīt pirmo daļu šeit.

Apache Beam un DataFlow reāllaika cauruļvadiem

Mēs izveidojam straumes datu apstrādes cauruļvadu. 2. daļa

Google mākoņa iestatīšana

Piezīme. Es izmantoju Google Cloud Shell, lai palaistu konveijeru un publicētu pielāgotus žurnāla datus, jo man radās problēmas ar cauruļvada palaišanu programmā Python 3. Google Cloud Shell izmanto Python 2, kas vairāk atbilst Apache Beam.

Lai sāktu cauruļvadu, mums ir nedaudz jāiedziļinās iestatījumos. Tiem no jums, kuri iepriekš nav izmantojuši GSP, jums būs jāveic tālāk norādītās 6 darbības, kas aprakstītas šajā sadaļā lappuse.

Pēc tam mums būs jāaugšupielādē savi skripti pakalpojumā Google Cloud Storage un jāpārkopē uz mūsu Google Cloud Shel. Augšupielāde mākoņkrātuvē ir diezgan triviāla (aprakstu var atrast šeit). Lai kopētu savus failus, mēs varam atvērt Google Cloud Shel no rīkjoslas, noklikšķinot uz pirmās ikonas kreisajā 2. attēlā zemāk.

Mēs izveidojam straumes datu apstrādes cauruļvadu. 2. daļa
Skaitlis 2

Tālāk ir norādītas komandas, kas mums vajadzīgas, lai kopētu failus un instalētu nepieciešamās bibliotēkas.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Mūsu datu bāzes un tabulas izveide

Kad esam pabeiguši visas ar iestatīšanu saistītās darbības, nākamā lieta, kas mums jādara, ir BigQuery izveidot datu kopu un tabulu. Ir vairāki veidi, kā to izdarīt, taču vienkāršākais ir izmantot Google Cloud konsoli, vispirms izveidojot datu kopu. Varat veikt tālāk norādītās darbības saitelai izveidotu tabulu ar shēmu. Mūsu galdam būs 7 kolonnas, kas atbilst katra lietotāja žurnāla sastāvdaļām. Ērtības labad visas kolonnas definēsim kā virknes, izņemot laika lokālo mainīgo, un nosauksim tās atbilstoši iepriekš ģenerētajiem mainīgajiem. Mūsu tabulas izkārtojumam vajadzētu izskatīties kā 3. attēlā.

Mēs izveidojam straumes datu apstrādes cauruļvadu. 2. daļa
3. attēls. Tabulas izkārtojums

Lietotāju žurnāla datu publicēšana

Pub/Sub ir būtiska mūsu konveijera sastāvdaļa, jo tā ļauj vairākām neatkarīgām lietojumprogrammām sazināties savā starpā. Jo īpaši tas darbojas kā starpnieks, kas ļauj mums nosūtīt un saņemt ziņojumus starp lietojumprogrammām. Pirmā lieta, kas mums jādara, ir izveidot tēmu. Vienkārši dodieties uz Pub/Sub konsolē un noklikšķiniet uz IZVEIDOT TĒMU.

Tālāk norādītais kods izsauc mūsu skriptu, lai ģenerētu iepriekš definētos žurnāla datus, un pēc tam izveido savienojumu un nosūta žurnālus uz Pub/Sub. Vienīgais, kas mums jādara, ir izveidot objektu PublisherClient, norādiet ceļu uz tēmu, izmantojot metodi topic_path un izsauciet funkciju publish с topic_path un dati. Lūdzu, ņemiet vērā, ka mēs importējam generate_log_line no mūsu skripta stream_logs, tāpēc pārliecinieties, vai šie faili atrodas vienā mapē, pretējā gadījumā tiks parādīta importēšanas kļūda. Pēc tam mēs to varam palaist, izmantojot mūsu Google konsoli, izmantojot:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Tiklīdz fails tiks palaists, mēs varēsim redzēt žurnāla datu izvadi konsolei, kā parādīts attēlā zemāk. Šis skripts darbosies tik ilgi, kamēr mēs to neizmantosim CTRL + Clai to pabeigtu.

Mēs izveidojam straumes datu apstrādes cauruļvadu. 2. daļa
4. attēls. Izvade publish_logs.py

Rakstām mūsu cauruļvada kodu

Tagad, kad viss ir sagatavots, varam sākt jautro daļu — mūsu konveijera kodēšanu, izmantojot Beam un Python. Lai izveidotu Beam cauruļvadu, mums ir jāizveido konveijera objekts (p). Kad esam izveidojuši konveijera objektu, mēs varam lietot vairākas funkcijas vienu pēc otras, izmantojot operatoru pipe (|). Kopumā darbplūsma izskatās kā attēlā zemāk.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Mūsu kodā mēs izveidosim divas pielāgotas funkcijas. Funkcija regex_clean, kas skenē datus un izgūst atbilstošo rindu, pamatojoties uz RAKSTU sarakstu, izmantojot funkciju re.search. Funkcija atgriež ar komatu atdalītu virkni. Ja neesat regulāro izteiksmju eksperts, iesaku to pārbaudīt pamācība un praktizējieties piezīmju grāmatiņā, lai pārbaudītu kodu. Pēc tam mēs definējam pielāgotu ParDo funkciju, ko sauc sadalīt, kas ir staru transformācijas variants paralēlai apstrādei. Python tas tiek darīts īpašā veidā - mums ir jāizveido klase, kas manto no DoFn Beam klases. Funkcija Sadalīt paņem parsēto rindu no iepriekšējās funkcijas un atgriež vārdnīcu sarakstu ar atslēgām, kas atbilst kolonnu nosaukumiem mūsu BigQuery tabulā. Ir kaut kas, kas jāņem vērā saistībā ar šo funkciju: man bija jāimportē datetime funkcijā, lai tā darbotos. Faila sākumā es saņēmu importēšanas kļūdu, kas bija dīvaini. Pēc tam šis saraksts tiek nodots funkcijai WriteToBigQuery, kas vienkārši pievieno mūsu datus tabulai. Batch DataFlow Job un Streaming DataFlow Job kods ir norādīts tālāk. Vienīgā atšķirība starp pakešu un straumēšanas kodu ir tā, ka paketē mēs lasām CSV no src_pathizmantojot funkciju ReadFromText no Beam.

Pakešu datu plūsmas darbs (pakešu apstrāde)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Straumēšanas DataFlow darbs (straumes apstrāde)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Konveijera palaišana

Mēs varam vadīt cauruļvadu vairākos dažādos veidos. Ja mēs vēlamies, mēs varētu to vienkārši palaist lokāli no termināļa, vienlaikus attālināti piesakoties GCP.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Tomēr mēs to darbosim, izmantojot DataFlow. Mēs to varam izdarīt, izmantojot tālāk norādīto komandu, iestatot šādus nepieciešamos parametrus.

  • project — jūsu GCP projekta ID.
  • runner ir cauruļvadu palaišanas programma, kas analizēs jūsu programmu un izveidos jūsu cauruļvadu. Lai palaistu mākonī, ir jānorāda DataflowRunner.
  • staging_location — ceļš uz Cloud Dataflow mākoņkrātuvi, lai indeksētu koda pakotnes, kas nepieciešamas procesoriem, kas veic darbu.
  • temp_location — ceļš uz mākoņa datu plūsmas mākoņkrātuvi pagaidu darba failu glabāšanai, kas izveidoti, kamēr darbojas konveijers.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Kamēr šī komanda darbojas, mēs varam doties uz cilni DataFlow Google konsolē un skatīt mūsu konveijeru. Kad mēs noklikšķinām uz konveijera, mums vajadzētu redzēt kaut ko līdzīgu 4. attēlā. Atkļūdošanas nolūkos var būt ļoti noderīgi doties uz žurnālus un pēc tam uz Stackdriver, lai skatītu detalizētus žurnālus. Tas man vairākos gadījumos ir palīdzējis atrisināt cauruļvada problēmas.

Mēs izveidojam straumes datu apstrādes cauruļvadu. 2. daļa
4. attēls. Siju konveijers

Piekļūstiet mūsu datiem pakalpojumā BigQuery

Tātad mums jau vajadzētu darboties konveijeram, kurā dati ieplūst mūsu tabulā. Lai to pārbaudītu, mēs varam doties uz BigQuery un apskatīt datus. Pēc tālāk norādītās komandas izmantošanas jums vajadzētu redzēt dažas pirmās datu kopas rindas. Tagad, kad BigQuery ir glabāti dati, mēs varam veikt turpmāku analīzi, kā arī kopīgot datus ar kolēģiem un sākt atbildēt uz biznesa jautājumiem.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Mēs izveidojam straumes datu apstrādes cauruļvadu. 2. daļa
5. attēls: BigQuery

Secinājums

Mēs ceram, ka šī ziņa kalpos kā noderīgs piemērs straumēšanas datu cauruļvada izveidei, kā arī veidu atrašanai, kā padarīt datus pieejamākus. Datu glabāšana šajā formātā sniedz mums daudzas priekšrocības. Tagad mēs varam sākt atbildēt uz svarīgiem jautājumiem, piemēram, cik cilvēku izmanto mūsu produktu? Vai jūsu lietotāju bāze laika gaitā pieaug? Ar kādiem produkta aspektiem cilvēki mijiedarbojas visvairāk? Un vai ir kļūdas tur, kur tām nevajadzētu būt? Šie ir jautājumi, kas interesēs organizāciju. Pamatojoties uz ieskatiem, kas izriet no atbildēm uz šiem jautājumiem, mēs varam uzlabot produktu un palielināt lietotāju iesaisti.

Beam ir patiešām noderīgs šāda veida vingrinājumiem, un tam ir arī vairāki citi interesanti lietošanas gadījumi. Piemēram, iespējams, vēlēsities analizēt akciju zīmju datus reāllaikā un veikt darījumus, pamatojoties uz analīzi, iespējams, jums ir sensoru dati, kas nāk no transportlīdzekļiem un vēlaties aprēķināt satiksmes līmeņa aprēķinus. Varat arī, piemēram, būt spēļu uzņēmums, kas apkopo lietotāju datus un izmanto tos, lai izveidotu informācijas paneļus, lai izsekotu galvenos rādītājus. Labi, kungi, šī ir tēma citam ierakstam, paldies, ka izlasījāt, un tiem, kas vēlas redzēt pilnu kodu, zemāk ir saite uz manu GitHub.

https://github.com/DFoly/User_log_pipeline

Tas ir viss. Izlasi pirmo daļu.

Avots: www.habr.com

Pievieno komentāru