We creëren een pijplijn voor de verwerking van stroomgegevens. Deel 2

Dag Allemaal. We delen de vertaling van het laatste deel van het artikel, speciaal opgesteld voor studenten van de cursus. Gegevens ingenieur. Je kunt het eerste deel lezen hier.

Apache Beam en DataFlow voor realtime pijplijnen

We creëren een pijplijn voor de verwerking van stroomgegevens. Deel 2

Google Cloud instellen

Opmerking: ik heb Google Cloud Shell gebruikt om de pijplijn uit te voeren en aangepaste logboekgegevens te publiceren, omdat ik problemen ondervond bij het uitvoeren van de pijplijn in Python 3. Google Cloud Shell gebruikt Python 2, wat consistenter is met Apache Beam.

Om de pijplijn te starten, moeten we een beetje in de instellingen duiken. Voor degenen onder u die GCP nog niet eerder hebben gebruikt, moet u de volgende zes stappen volgen die hierin worden beschreven pagina.

Hierna moeten we onze scripts uploaden naar Google Cloud Storage en ze kopiëren naar onze Google Cloud Shel. Uploaden naar cloudopslag is vrij triviaal (een beschrijving is te vinden). hier). Om onze bestanden te kopiëren, kunnen we Google Cloud Shel openen vanuit de werkbalk door op het eerste pictogram aan de linkerkant in Figuur 2 hieronder te klikken.

We creëren een pijplijn voor de verwerking van stroomgegevens. Deel 2
Figuur 2

De opdrachten die we nodig hebben om de bestanden te kopiëren en de vereiste bibliotheken te installeren, staan ​​hieronder vermeld.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Onze database en tabel maken

Nadat we alle installatiegerelateerde stappen hebben voltooid, moeten we eerst een dataset en tabel maken in BigQuery. Er zijn verschillende manieren om dit te doen, maar de eenvoudigste is om de Google Cloud-console te gebruiken door eerst een dataset te maken. U kunt de onderstaande stappen volgen linkom een ​​tabel met een schema te maken. Onze tafel zal hebben 7 kolommen, overeenkomend met de componenten van elk gebruikerslogboek. Voor het gemak definiëren we alle kolommen als tekenreeksen, behalve de tijdlokale variabele, en geven we ze een naam op basis van de variabelen die we eerder hebben gegenereerd. De lay-out van onze tabel zou er uit moeten zien als in Figuur 3.

We creëren een pijplijn voor de verwerking van stroomgegevens. Deel 2
Figuur 3. Tabelindeling

Publicatie van gebruikersloggegevens

Pub/Sub is een cruciaal onderdeel van onze pijplijn omdat het meerdere onafhankelijke applicaties met elkaar laat communiceren. Het werkt met name als tussenpersoon waarmee we berichten tussen applicaties kunnen verzenden en ontvangen. Het eerste wat we moeten doen is een onderwerp aanmaken. Ga gewoon naar Pub/Sub in de console en klik op MAAK ONDERWERP.

De onderstaande code roept ons script aan om de hierboven gedefinieerde loggegevens te genereren en maakt vervolgens verbinding en verzendt de logs naar Pub/Sub. Het enige dat we hoeven te doen is een object maken UitgeverClient, geef het pad naar het onderwerp op met behulp van de methode topic_path en roep de functie aan publish с topic_path en gegevens. Houd er rekening mee dat wij importeren generate_log_line uit ons schrift stream_logs, zorg er dus voor dat deze bestanden in dezelfde map staan, anders krijgt u een importfout. We kunnen dit vervolgens via onze Google Console uitvoeren met behulp van:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Zodra het bestand wordt uitgevoerd, kunnen we de uitvoer van de loggegevens naar de console zien, zoals weergegeven in de onderstaande afbeelding. Dit script werkt zolang we het niet gebruiken CTRL + Com het te voltooien.

We creëren een pijplijn voor de verwerking van stroomgegevens. Deel 2
Figuur 4. Uitvoer publish_logs.py

Het schrijven van onze pijplijncode

Nu we alles hebben voorbereid, kunnen we beginnen met het leuke gedeelte: het coderen van onze pijplijn met Beam en Python. Om een ​​Beam-pijplijn te maken, moeten we een pijplijnobject (p) maken. Nadat we een pijplijnobject hebben gemaakt, kunnen we meerdere functies achter elkaar toepassen met behulp van de operator pipe (|). Over het algemeen ziet de workflow eruit als de onderstaande afbeelding.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

In onze code zullen we twee aangepaste functies maken. Functie regex_clean, dat de gegevens scant en de overeenkomstige rij ophaalt op basis van de PATTERNS-lijst met behulp van de functie re.search. De functie retourneert een door komma's gescheiden tekenreeks. Als u geen expert op het gebied van reguliere expressies bent, raad ik u aan dit eens te bekijken zelfstudie en oefen in een notitieblok om de code te controleren. Hierna definiëren we een aangepaste ParDo-functie genaamd Split, wat een variatie is op de Beam-transformatie voor parallelle verwerking. In Python gebeurt dit op een speciale manier: we moeten een klasse maken die overerft van de DoFn Beam-klasse. De functie Splitsen neemt de geparseerde rij uit de vorige functie en retourneert een lijst met woordenboeken met sleutels die overeenkomen met de kolomnamen in onze BigQuery-tabel. Er is iets op te merken over deze functie: ik moest importeren datetime in een functie om deze te laten werken. Ik kreeg een importfout aan het begin van het bestand, wat raar was. Deze lijst wordt vervolgens doorgegeven aan de functie SchrijfnaarBigQuery, waarmee onze gegevens eenvoudigweg aan de tabel worden toegevoegd. De code voor Batch DataFlow Job en Streaming DataFlow Job wordt hieronder gegeven. Het enige verschil tussen batch- en streamingcode is dat we in batch de CSV uitlezen src_pathde functie gebruiken: ReadFromText van Beam.

Batch DataFlow-taak (batchverwerking)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow-taak (streamverwerking)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Het starten van de transportband

We kunnen de pijpleiding op verschillende manieren laten lopen. Als we wilden, konden we het gewoon lokaal vanaf een terminal uitvoeren terwijl we op afstand bij GCP inlogden.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

We gaan het echter uitvoeren met DataFlow. We kunnen dit doen met behulp van de onderstaande opdracht door de volgende vereiste parameters in te stellen.

  • project — ID van uw GCP-project.
  • runner is een pipeline runner die uw programma analyseert en uw pipeline opbouwt. Om in de cloud te kunnen draaien, moet u een DataflowRunner opgeven.
  • staging_location — het pad naar de Cloud Dataflow-cloudopslag voor het indexeren van codepakketten die nodig zijn voor de verwerkers die het werk uitvoeren.
  • temp_location — pad naar de Cloud Dataflow-cloudopslag voor het opslaan van tijdelijke taakbestanden die zijn gemaakt terwijl de pijplijn actief is.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Terwijl deze opdracht wordt uitgevoerd, kunnen we naar het tabblad DataFlow in de Google Console gaan en onze pijplijn bekijken. Als we op de pijplijn klikken, zouden we iets moeten zien dat lijkt op Figuur 4. Voor foutopsporingsdoeleinden kan het erg handig zijn om naar Logboeken en vervolgens naar Stackdriver te gaan om gedetailleerde logboeken te bekijken. Dit heeft mij in een aantal gevallen geholpen bij het oplossen van pijplijnproblemen.

We creëren een pijplijn voor de verwerking van stroomgegevens. Deel 2
Figuur 4: Balkentransportband

Krijg toegang tot onze gegevens in BigQuery

We zouden dus al een pijplijn moeten hebben met gegevens die onze tabel binnenkomen. Om dit te testen, kunnen we naar BigQuery gaan en de gegevens bekijken. Nadat u de onderstaande opdracht hebt gebruikt, ziet u de eerste paar rijen van de gegevensset. Nu we de gegevens in BigQuery hebben opgeslagen, kunnen we verdere analyses uitvoeren, de gegevens delen met collega's en beginnen met het beantwoorden van zakelijke vragen.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

We creëren een pijplijn voor de verwerking van stroomgegevens. Deel 2
Figuur 5: BigQuery

Conclusie

We hopen dat dit bericht dient als een nuttig voorbeeld van het creëren van een streamingdatapijplijn, en van het vinden van manieren om gegevens toegankelijker te maken. Het opslaan van gegevens in dit formaat biedt ons veel voordelen. Nu kunnen we belangrijke vragen gaan beantwoorden, zoals: hoeveel mensen gebruiken ons product? Groeit uw gebruikersbestand in de loop van de tijd? Met welke aspecten van het product hebben mensen het meest interactie? En zijn er fouten die er niet zouden moeten zijn? Dit zijn vragen die voor de organisatie interessant zullen zijn. Op basis van de inzichten die uit de antwoorden op deze vragen naar voren komen, kunnen we het product verbeteren en de gebruikersbetrokkenheid vergroten.

Beam is erg handig voor dit soort oefeningen en heeft ook een aantal andere interessante gebruiksscenario's. U wilt bijvoorbeeld aandelenkoersgegevens in realtime analyseren en transacties uitvoeren op basis van de analyse. Misschien heeft u sensorgegevens van voertuigen en wilt u berekeningen van het verkeersniveau berekenen. U kunt bijvoorbeeld ook een gamingbedrijf zijn dat gebruikersgegevens verzamelt en deze gebruikt om dashboards te maken om belangrijke statistieken bij te houden. Oké heren, dit is een onderwerp voor een ander bericht, bedankt voor het lezen, en voor degenen die de volledige code willen zien, hieronder staat de link naar mijn GitHub.

https://github.com/DFoly/User_log_pipeline

Dat is alles. Lees deel één.

Bron: www.habr.com

Voeg een reactie