Vi lager en strømdatabehandlingspipeline. Del 2

Hei alle sammen. Vi deler oversettelsen av den siste delen av artikkelen, utarbeidet spesielt for studenter på kurset. Dataingeniør. Du kan lese den første delen her.

Apache Beam og DataFlow for sanntidsrørledninger

Vi lager en strømdatabehandlingspipeline. Del 2

Sette opp Google Cloud

Merk: Jeg brukte Google Cloud Shell til å kjøre pipelinen og publisere tilpassede loggdata fordi jeg hadde problemer med å kjøre pipelinen i Python 3. Google Cloud Shell bruker Python 2, som er mer konsistent med Apache Beam.

For å starte rørledningen må vi grave litt i innstillingene. For de av dere som ikke har brukt GCP før, må du følge de følgende 6 trinnene som er skissert i denne side.

Etter dette må vi laste opp skriptene våre til Google Cloud Storage og kopiere dem til Google Cloud Shel. Opplasting til skylagring er ganske trivielt (en beskrivelse kan bli funnet her). For å kopiere filene våre kan vi åpne Google Cloud Shel fra verktøylinjen ved å klikke på det første ikonet til venstre i figur 2 nedenfor.

Vi lager en strømdatabehandlingspipeline. Del 2
Figur 2

Kommandoene vi trenger for å kopiere filene og installere de nødvendige bibliotekene er oppført nedenfor.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Opprette vår database og tabell

Når vi har fullført alle oppsettrelaterte trinn, er det neste vi må gjøre å lage et datasett og en tabell i BigQuery. Det er flere måter å gjøre dette på, men den enkleste er å bruke Google Cloud-konsollen ved først å lage et datasett. Du kan følge trinnene nedenfor linkfor å lage en tabell med et skjema. Bordet vårt vil ha 7 kolonner, som tilsvarer komponentene i hver brukerlogg. For enkelhets skyld vil vi definere alle kolonner som strenger, bortsett fra den tidslokale variabelen, og navngi dem i henhold til variablene vi genererte tidligere. Oppsettet til bordet vårt skal se ut som i figur 3.

Vi lager en strømdatabehandlingspipeline. Del 2
Figur 3. Tabelloppsett

Publisering av brukerloggdata

Pub/Sub er en kritisk komponent i vår pipeline fordi den lar flere uavhengige applikasjoner kommunisere med hverandre. Spesielt fungerer den som et mellomledd som lar oss sende og motta meldinger mellom applikasjoner. Det første vi må gjøre er å lage et emne. Bare gå til Pub/Sub i konsollen og klikk LAG TEMNE.

Koden nedenfor kaller skriptet vårt for å generere loggdataene definert ovenfor og kobler deretter loggene til Pub/Sub. Det eneste vi trenger å gjøre er å lage et objekt PublisherClient, spesifiser banen til emnet ved hjelp av metoden topic_path og kall opp funksjonen publish с topic_path og data. Vær oppmerksom på at vi importerer generate_log_line fra manuset vårt stream_logs, så sørg for at disse filene er i samme mappe, ellers får du en importfeil. Vi kan deretter kjøre dette gjennom Google-konsollen vår ved å bruke:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Så snart filen kjører, vil vi kunne se utdataene fra loggdataene til konsollen, som vist i figuren nedenfor. Dette skriptet vil fungere så lenge vi ikke bruker det CTRL + Cfor å fullføre den.

Vi lager en strømdatabehandlingspipeline. Del 2
Figur 4. Utgang publish_logs.py

Skriver pipelinekoden vår

Nå som vi har alt forberedt, kan vi starte den morsomme delen - koding av rørledningen vår ved hjelp av Beam og Python. For å lage en Beam-rørledning, må vi lage et rørledningsobjekt (p). Når vi har opprettet et pipeline-objekt, kan vi bruke flere funksjoner etter hverandre ved hjelp av operatøren pipe (|). Generelt ser arbeidsflyten ut som bildet nedenfor.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

I koden vår vil vi lage to tilpassede funksjoner. Funksjon regex_clean, som skanner dataene og henter den tilsvarende raden basert på PATTERNS-listen ved hjelp av funksjonen re.search. Funksjonen returnerer en kommaseparert streng. Hvis du ikke er ekspert på regulære uttrykk, anbefaler jeg å sjekke ut dette opplæringen og øv i en notisblokk for å sjekke koden. Etter dette definerer vi en egendefinert ParDo-funksjon kalt Dele, som er en variant av Beam-transformasjonen for parallell prosessering. I Python gjøres dette på en spesiell måte – vi må lage en klasse som arver fra DoFn Beam-klassen. Split-funksjonen tar den analyserte raden fra forrige funksjon og returnerer en liste over ordbøker med nøkler som tilsvarer kolonnenavnene i BigQuery-tabellen vår. Det er noe å merke seg om denne funksjonen: Jeg måtte importere datetime inne i en funksjon for å få den til å fungere. Jeg fikk en importfeil i begynnelsen av filen, noe som var rart. Denne listen sendes deretter til funksjonen WriteToBigQuery, som ganske enkelt legger til dataene våre i tabellen. Koden for Batch DataFlow Job og Streaming DataFlow Job er gitt nedenfor. Den eneste forskjellen mellom batch og strømmekode er at vi leser CSV fra batch src_pathved å bruke funksjonen ReadFromText fra Beam.

Batch DataFlow Job (batchbehandling)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (strømbehandling)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Starter transportøren

Vi kan kjøre rørledningen på flere forskjellige måter. Hvis vi ville, kunne vi bare kjøre det lokalt fra en terminal mens vi logger på GCP eksternt.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Vi skal imidlertid kjøre det ved hjelp av DataFlow. Vi kan gjøre dette ved å bruke kommandoen nedenfor ved å angi følgende nødvendige parametere.

  • project — ID for GCP-prosjektet ditt.
  • runner er en pipeline runner som vil analysere programmet og konstruere pipeline. For å kjøre i skyen må du spesifisere en DataflowRunner.
  • staging_location — banen til Cloud Dataflow-skylagringen for indeksering av kodepakker som trengs av prosessorene som utfører arbeidet.
  • temp_location — bane til Cloud Dataflow-skylagringen for lagring av midlertidige jobbfiler opprettet mens rørledningen kjører.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Mens denne kommandoen kjører, kan vi gå til DataFlow-fanen i Google-konsollen og se vår pipeline. Når vi klikker på rørledningen, skal vi se noe som ligner på figur 4. For feilsøkingsformål kan det være svært nyttig å gå til Logger og deretter til Stackdriver for å se de detaljerte loggene. Dette har hjulpet meg med å løse rørledningsproblemer i en rekke tilfeller.

Vi lager en strømdatabehandlingspipeline. Del 2
Figur 4: Bjelketransportør

Få tilgang til våre data i BigQuery

Så vi burde allerede ha en pipeline som kjører med data som strømmer inn i tabellen vår. For å teste dette kan vi gå til BigQuery og se på dataene. Etter å ha brukt kommandoen nedenfor bør du se de første par radene i datasettet. Nå som vi har dataene lagret i BigQuery, kan vi gjennomføre ytterligere analyser, samt dele dataene med kolleger og begynne å svare på forretningsspørsmål.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Vi lager en strømdatabehandlingspipeline. Del 2
Figur 5: BigQuery

Konklusjon

Vi håper dette innlegget fungerer som et nyttig eksempel på å lage en strømmedatapipeline, i tillegg til å finne måter å gjøre data mer tilgjengelig på. Lagring av data i dette formatet gir oss mange fordeler. Nå kan vi begynne å svare på viktige spørsmål som hvor mange som bruker produktet vårt? Vokser brukerbasen din over tid? Hvilke aspekter av produktet samhandler folk mest med? Og er det feil der det ikke burde være? Dette er spørsmålene som vil være av interesse for organisasjonen. Basert på innsikten som kommer frem av svarene på disse spørsmålene, kan vi forbedre produktet og øke brukerengasjementet.

Beam er veldig nyttig for denne typen trening og har en rekke andre interessante bruksområder også. Det kan for eksempel være lurt å analysere aksjetickdata i sanntid og gjøre handler basert på analysen, kanskje du har sensordata som kommer fra kjøretøy og ønsker å beregne trafikknivåberegninger. Du kan for eksempel også være et spillselskap som samler inn brukerdata og bruker dem til å lage dashboards for å spore nøkkeltall. Ok, mine herrer, dette er et emne for et annet innlegg, takk for at du leste, og for de som vil se hele koden, nedenfor er lenken til min GitHub.

https://github.com/DFoly/User_log_pipeline

Det er alt. Les del én.

Kilde: www.habr.com

Legg til en kommentar