Dag Allemaal. We delen de vertaling van het laatste deel van het artikel, speciaal opgesteld voor studenten van de cursus.
Apache Beam en DataFlow voor realtime pijplijnen
Google Cloud instellen
Opmerking: ik heb Google Cloud Shell gebruikt om de pijplijn uit te voeren en aangepaste logboekgegevens te publiceren, omdat ik problemen ondervond bij het uitvoeren van de pijplijn in Python 3. Google Cloud Shell gebruikt Python 2, wat consistenter is met Apache Beam.
Om de pijplijn te starten, moeten we een beetje in de instellingen duiken. Voor degenen onder u die GCP nog niet eerder hebben gebruikt, moet u de volgende zes stappen volgen die hierin worden beschreven
Hierna moeten we onze scripts uploaden naar Google Cloud Storage en ze kopiëren naar onze Google Cloud Shel. Uploaden naar cloudopslag is vrij triviaal (een beschrijving is te vinden).
Figuur 2
De opdrachten die we nodig hebben om de bestanden te kopiëren en de vereiste bibliotheken te installeren, staan hieronder vermeld.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Onze database en tabel maken
Nadat we alle installatiegerelateerde stappen hebben voltooid, moeten we eerst een dataset en tabel maken in BigQuery. Er zijn verschillende manieren om dit te doen, maar de eenvoudigste is om de Google Cloud-console te gebruiken door eerst een dataset te maken. U kunt de onderstaande stappen volgen
Figuur 3. Tabelindeling
Publicatie van gebruikersloggegevens
Pub/Sub is een cruciaal onderdeel van onze pijplijn omdat het meerdere onafhankelijke applicaties met elkaar laat communiceren. Het werkt met name als tussenpersoon waarmee we berichten tussen applicaties kunnen verzenden en ontvangen. Het eerste wat we moeten doen is een onderwerp aanmaken. Ga gewoon naar Pub/Sub in de console en klik op MAAK ONDERWERP.
De onderstaande code roept ons script aan om de hierboven gedefinieerde loggegevens te genereren en maakt vervolgens verbinding en verzendt de logs naar Pub/Sub. Het enige dat we hoeven te doen is een object maken UitgeverClient, geef het pad naar het onderwerp op met behulp van de methode topic_path
en roep de functie aan publish
с topic_path
en gegevens. Houd er rekening mee dat wij importeren generate_log_line
uit ons schrift stream_logs
, zorg er dus voor dat deze bestanden in dezelfde map staan, anders krijgt u een importfout. We kunnen dit vervolgens via onze Google Console uitvoeren met behulp van:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Zodra het bestand wordt uitgevoerd, kunnen we de uitvoer van de loggegevens naar de console zien, zoals weergegeven in de onderstaande afbeelding. Dit script werkt zolang we het niet gebruiken CTRL + Com het te voltooien.
Figuur 4. Uitvoer publish_logs.py
Het schrijven van onze pijplijncode
Nu we alles hebben voorbereid, kunnen we beginnen met het leuke gedeelte: het coderen van onze pijplijn met Beam en Python. Om een Beam-pijplijn te maken, moeten we een pijplijnobject (p) maken. Nadat we een pijplijnobject hebben gemaakt, kunnen we meerdere functies achter elkaar toepassen met behulp van de operator pipe (|)
. Over het algemeen ziet de workflow eruit als de onderstaande afbeelding.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
In onze code zullen we twee aangepaste functies maken. Functie regex_clean
, dat de gegevens scant en de overeenkomstige rij ophaalt op basis van de PATTERNS-lijst met behulp van de functie re.search
. De functie retourneert een door komma's gescheiden tekenreeks. Als u geen expert op het gebied van reguliere expressies bent, raad ik u aan dit eens te bekijken datetime
in een functie om deze te laten werken. Ik kreeg een importfout aan het begin van het bestand, wat raar was. Deze lijst wordt vervolgens doorgegeven aan de functie SchrijfnaarBigQuery, waarmee onze gegevens eenvoudigweg aan de tabel worden toegevoegd. De code voor Batch DataFlow Job en Streaming DataFlow Job wordt hieronder gegeven. Het enige verschil tussen batch- en streamingcode is dat we in batch de CSV uitlezen src_path
de functie gebruiken: ReadFromText
van Beam.
Batch DataFlow-taak (batchverwerking)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Streaming DataFlow-taak (streamverwerking)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Het starten van de transportband
We kunnen de pijpleiding op verschillende manieren laten lopen. Als we wilden, konden we het gewoon lokaal vanaf een terminal uitvoeren terwijl we op afstand bij GCP inlogden.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
We gaan het echter uitvoeren met DataFlow. We kunnen dit doen met behulp van de onderstaande opdracht door de volgende vereiste parameters in te stellen.
project
— ID van uw GCP-project.runner
is een pipeline runner die uw programma analyseert en uw pipeline opbouwt. Om in de cloud te kunnen draaien, moet u een DataflowRunner opgeven.staging_location
— het pad naar de Cloud Dataflow-cloudopslag voor het indexeren van codepakketten die nodig zijn voor de verwerkers die het werk uitvoeren.temp_location
— pad naar de Cloud Dataflow-cloudopslag voor het opslaan van tijdelijke taakbestanden die zijn gemaakt terwijl de pijplijn actief is.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Terwijl deze opdracht wordt uitgevoerd, kunnen we naar het tabblad DataFlow in de Google Console gaan en onze pijplijn bekijken. Als we op de pijplijn klikken, zouden we iets moeten zien dat lijkt op Figuur 4. Voor foutopsporingsdoeleinden kan het erg handig zijn om naar Logboeken en vervolgens naar Stackdriver te gaan om gedetailleerde logboeken te bekijken. Dit heeft mij in een aantal gevallen geholpen bij het oplossen van pijplijnproblemen.
Figuur 4: Balkentransportband
Krijg toegang tot onze gegevens in BigQuery
We zouden dus al een pijplijn moeten hebben met gegevens die onze tabel binnenkomen. Om dit te testen, kunnen we naar BigQuery gaan en de gegevens bekijken. Nadat u de onderstaande opdracht hebt gebruikt, ziet u de eerste paar rijen van de gegevensset. Nu we de gegevens in BigQuery hebben opgeslagen, kunnen we verdere analyses uitvoeren, de gegevens delen met collega's en beginnen met het beantwoorden van zakelijke vragen.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Figuur 5: BigQuery
Conclusie
We hopen dat dit bericht dient als een nuttig voorbeeld van het creëren van een streamingdatapijplijn, en van het vinden van manieren om gegevens toegankelijker te maken. Het opslaan van gegevens in dit formaat biedt ons veel voordelen. Nu kunnen we belangrijke vragen gaan beantwoorden, zoals: hoeveel mensen gebruiken ons product? Groeit uw gebruikersbestand in de loop van de tijd? Met welke aspecten van het product hebben mensen het meest interactie? En zijn er fouten die er niet zouden moeten zijn? Dit zijn vragen die voor de organisatie interessant zullen zijn. Op basis van de inzichten die uit de antwoorden op deze vragen naar voren komen, kunnen we het product verbeteren en de gebruikersbetrokkenheid vergroten.
Beam is erg handig voor dit soort oefeningen en heeft ook een aantal andere interessante gebruiksscenario's. U wilt bijvoorbeeld aandelenkoersgegevens in realtime analyseren en transacties uitvoeren op basis van de analyse. Misschien heeft u sensorgegevens van voertuigen en wilt u berekeningen van het verkeersniveau berekenen. U kunt bijvoorbeeld ook een gamingbedrijf zijn dat gebruikersgegevens verzamelt en deze gebruikt om dashboards te maken om belangrijke statistieken bij te houden. Oké heren, dit is een onderwerp voor een ander bericht, bedankt voor het lezen, en voor degenen die de volledige code willen zien, hieronder staat de link naar mijn GitHub.
Dat is alles.
Bron: www.habr.com