Vi skapar en pipeline för bearbetning av strömdata. Del 2

Hej alla. Vi delar översättningen av den sista delen av artikeln, förberedd specifikt för kursens studenter. Dataingenjör. Du kan läsa den första delen här.

Apache Beam och DataFlow för realtidspipelines

Vi skapar en pipeline för bearbetning av strömdata. Del 2

Konfigurera Google Cloud

Obs: Jag använde Google Cloud Shell för att köra pipeline och publicera anpassade loggdata eftersom jag hade problem med att köra pipeline i Python 3. Google Cloud Shell använder Python 2, vilket är mer konsekvent med Apache Beam.

För att starta pipelinen måste vi gräva lite i inställningarna. För er som inte har använt GCP tidigare måste ni följa följande 6 steg som beskrivs i detta странице.

Efter detta måste vi ladda upp våra skript till Google Cloud Storage och kopiera dem till vår Google Cloud Shel. Att ladda upp till molnlagring är ganska trivialt (en beskrivning kan hittas här). För att kopiera våra filer kan vi öppna Google Cloud Shel från verktygsfältet genom att klicka på den första ikonen till vänster i figur 2 nedan.

Vi skapar en pipeline för bearbetning av strömdata. Del 2
Figur 2

De kommandon vi behöver för att kopiera filerna och installera de nödvändiga biblioteken listas nedan.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Skapar vår databas och tabell

När vi har slutfört alla installationsrelaterade steg är nästa sak vi behöver göra att skapa en datauppsättning och en tabell i BigQuery. Det finns flera sätt att göra detta, men det enklaste är att använda Google Cloud-konsolen genom att först skapa en datauppsättning. Du kan följa stegen nedan länkför att skapa en tabell med ett schema. Vårt bord kommer att ha 7 kolumner, motsvarande komponenterna i varje användarlogg. För enkelhetens skull kommer vi att definiera alla kolumner som strängar, förutom den tidslokala variabeln, och namnge dem enligt de variabler vi genererade tidigare. Layouten på vårt bord ska se ut som i figur 3.

Vi skapar en pipeline för bearbetning av strömdata. Del 2
Figur 3. Tabelllayout

Publicering av användarloggdata

Pub/Sub är en kritisk komponent i vår pipeline eftersom den tillåter flera oberoende applikationer att kommunicera med varandra. I synnerhet fungerar det som en mellanhand som gör att vi kan skicka och ta emot meddelanden mellan applikationer. Det första vi behöver göra är att skapa ett ämne. Gå bara till Pub/Sub i konsolen och klicka på CREATE TOPIC.

Koden nedan anropar vårt skript för att generera loggdata som definierats ovan och ansluter sedan och skickar loggarna till Pub/Sub. Det enda vi behöver göra är att skapa ett objekt PublisherClient, ange sökvägen till ämnet med metoden topic_path och anropa funktionen publish с topic_path och data. Observera att vi importerar generate_log_line från vårt manus stream_logs, så se till att dessa filer finns i samma mapp, annars får du ett importfel. Vi kan sedan köra detta via vår Google-konsol med hjälp av:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Så snart filen körs kommer vi att kunna se utdata från loggdata till konsolen, som visas i figuren nedan. Det här skriptet kommer att fungera så länge vi inte använder det CTRL + Cför att slutföra den.

Vi skapar en pipeline för bearbetning av strömdata. Del 2
Figur 4. Utgång publish_logs.py

Skriver vår pipeline-kod

Nu när vi har allt förberett kan vi börja den roliga delen - koda vår pipeline med Beam och Python. För att skapa en Beam-pipeline måste vi skapa ett pipeline-objekt (p). När vi väl har skapat ett pipelineobjekt kan vi tillämpa flera funktioner efter varandra med hjälp av operatören pipe (|). I allmänhet ser arbetsflödet ut som på bilden nedan.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

I vår kod kommer vi att skapa två anpassade funktioner. Fungera regex_clean, som skannar data och hämtar motsvarande rad baserat på PATTERNS-listan med funktionen re.search. Funktionen returnerar en kommaseparerad sträng. Om du inte är expert på reguljära uttryck rekommenderar jag att du kollar in detta handledning och öva i ett anteckningsblock för att kontrollera koden. Efter detta definierar vi en anpassad ParDo-funktion som kallas Split, som är en variant av Beam-transformen för parallell bearbetning. I Python görs detta på ett speciellt sätt – vi måste skapa en klass som ärver från DoFn Beam-klassen. Funktionen Split tar den analyserade raden från föregående funktion och returnerar en lista med ordböcker med nycklar som motsvarar kolumnnamnen i vår BigQuery-tabell. Det finns något att notera om den här funktionen: Jag var tvungen att importera datetime inuti en funktion för att få det att fungera. Jag fick ett importfel i början av filen, vilket var konstigt. Denna lista skickas sedan till funktionen WriteToBigQuery, som helt enkelt lägger till vår data i tabellen. Koden för Batch DataFlow Job och Streaming DataFlow Job ges nedan. Den enda skillnaden mellan batch och streamingkod är att vi i batch läser CSV från src_pathanvänder funktionen ReadFromText från Beam.

Batch DataFlow Job (batchbearbetning)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (strömbehandling)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Startar transportören

Vi kan driva pipelinen på flera olika sätt. Om vi ​​ville kunde vi bara köra det lokalt från en terminal samtidigt som vi loggar in på GCP på distans.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Vi kommer dock att köra det med DataFlow. Vi kan göra detta med kommandot nedan genom att ställa in följande nödvändiga parametrar.

  • project — ID för ditt GCP-projekt.
  • runner är en pipeline runner som kommer att analysera ditt program och konstruera din pipeline. För att köra i molnet måste du ange en DataflowRunner.
  • staging_location — Sökvägen till molnlagringen för Cloud Dataflow för indexering av kodpaket som behövs av de processorer som utför arbetet.
  • temp_location — sökväg till molnlagringen för Cloud Dataflow för lagring av temporära jobbfiler som skapats medan pipelinen körs.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Medan det här kommandot körs kan vi gå till fliken DataFlow i Google-konsolen och se vår pipeline. När vi klickar på pipelinen bör vi se något som liknar figur 4. För felsökningsändamål kan det vara till stor hjälp att gå till Logs och sedan till Stackdriver för att se de detaljerade loggarna. Detta har hjälpt mig att lösa pipelineproblem i ett antal fall.

Vi skapar en pipeline för bearbetning av strömdata. Del 2
Bild 4: Balktransportör

Få tillgång till vår data i BigQuery

Så vi borde redan ha en pipeline igång med data som flödar in i vår tabell. För att testa detta kan vi gå till BigQuery och titta på data. Efter att ha använt kommandot nedan bör du se de första raderna i datamängden. Nu när vi har data lagrad i BigQuery kan vi genomföra ytterligare analyser, samt dela data med kollegor och börja svara på affärsfrågor.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Vi skapar en pipeline för bearbetning av strömdata. Del 2
Figur 5: BigQuery

Slutsats

Vi hoppas att det här inlägget fungerar som ett användbart exempel på att skapa en strömmande datapipeline, samt att hitta sätt att göra data mer tillgänglig. Att lagra data i detta format ger oss många fördelar. Nu kan vi börja svara på viktiga frågor som hur många som använder vår produkt? Växer din användarbas med tiden? Vilka aspekter av produkten interagerar människor mest med? Och finns det fel där det inte borde finnas? Det är dessa frågor som kommer att vara av intresse för organisationen. Baserat på de insikter som kommer fram av svaren på dessa frågor kan vi förbättra produkten och öka användarens engagemang.

Beam är verkligen användbart för den här typen av träning och har ett antal andra intressanta användningsfall också. Till exempel kanske du vill analysera aktieinformation i realtid och göra affärer baserat på analysen, kanske har du sensordata som kommer från fordon och vill beräkna trafiknivåberäkningar. Du kan också till exempel vara ett spelföretag som samlar in användardata och använder den för att skapa instrumentpaneler för att spåra nyckeltal. Okej, mina herrar, detta är ett ämne för ett annat inlägg, tack för att du läste, och för de som vill se hela koden, nedan är länken till min GitHub.

https://github.com/DFoly/User_log_pipeline

Det är allt. Läs del ett.

Källa: will.com

Lägg en kommentar