Hoi allegearre. Wy diele de oersetting fan it lêste diel fan it artikel, spesifyk taret foar studinten fan 'e kursus.
Apache Beam en DataFlow foar Real-Time Pipelines
Google Cloud ynstelle
Opmerking: ik brûkte Google Cloud Shell om de pipeline út te fieren en oanpaste loggegevens te publisearjen, om't ik problemen hie mei it útfieren fan de pipeline yn Python 3. Google Cloud Shell brûkt Python 2, wat mear konsekwint is mei Apache Beam.
Om de pipeline te begjinnen, moatte wy in bytsje grave yn 'e ynstellingen. Foar dy fan jimme dy't GCP net earder hawwe brûkt, moatte jo de folgjende 6 stappen folgje dy't yn dizze beskreaun binne
Hjirnei moatte wy ús skripts uploade nei Google Cloud Storage en kopiearje nei ús Google Cloud Shel. Uploaden nei wolk opslach is frij triviaal (in beskriuwing kin fûn wurde
Ofbylding 2
De kommando's dy't wy nedich binne om de bestannen te kopiearjen en de fereaske biblioteken te ynstallearjen binne hjirûnder neamd.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
It meitsjen fan ús databank en tabel
Sadree't wy hawwe foltôge alle opset relatearre stappen, it folgjende ding dat wy moatte dwaan is meitsje in dataset en tabel yn BigQuery. D'r binne ferskate manieren om dit te dwaan, mar de ienfâldichste is om de Google Cloud-konsole te brûken troch earst in dataset te meitsjen. Jo kinne de stappen hjirûnder folgje
figuer 3. tabel layout
It publisearjen fan brûkersloggegevens
Pub/Sub is in kritysk ûnderdiel fan ús pipeline, om't it meardere ûnôfhinklike applikaasjes mooglik makket om mei elkoar te kommunisearjen. Benammen wurket it as in tuskenpersoan wêrtroch ús berjochten ferstjoere en ûntfange kinne tusken applikaasjes. It earste ding dat wy moatte dwaan is in ûnderwerp oanmeitsje. Gean gewoan nei Pub / Sub yn 'e konsole en klikje ONDERWERK CREATE.
De koade hjirûnder neamt ús skript om de hjirboppe definieare loggegevens te generearjen en ferbynt en stjoert de logs dan nei Pub/Sub. It iennichste wat wy moatte dwaan is in objekt meitsje PublisherClient, spesifisearje it paad nei it ûnderwerp mei de metoade topic_path
en belje de funksje publish
с topic_path
en gegevens. Tink derom dat wy ymportearje generate_log_line
út ús skript stream_logs
, soargje derfoar dat dizze bestannen yn deselde map steane, oars krije jo in ymportflater. Wy kinne dit dan útfiere fia ús Google-konsole mei:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Sadree't it bestân rint, sille wy de útfier fan 'e loggegevens nei de konsole sjen kinne, lykas werjûn yn' e ôfbylding hjirûnder. Dit skript sil wurkje salang't wy net brûke CTRL + Com it te foltôgjen.
figuer 4. Utfier publish_logs.py
Us pipeline-koade skriuwe
No't wy alles klear hawwe, kinne wy it leuke diel begjinne - ús pipeline kodearje mei Beam en Python. Om in Beam-pipeline te meitsjen, moatte wy in pipeline-objekt (p) meitsje. Sadree't wy hawwe makke in pipeline foarwerp, wy kinne tapasse meardere funksjes ien nei de oare mei help fan de operator pipe (|)
. Yn 't algemien liket de workflow as de ôfbylding hjirûnder.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Yn ús koade sille wy twa oanpaste funksjes oanmeitsje. Funksje regex_clean
, dy't de gegevens scant en de oerienkommende rige ophelje op basis fan de PATTERNS list mei de funksje re.search
. De funksje jout in komma-skieden tekenrige werom. As jo gjin ekspert op reguliere ekspresje binne, advisearje ik dit te kontrolearjen datetime
binnen in funksje om it te wurkjen. Ik krige in ymportflater oan it begjin fan it bestân, dat wie nuver. Dizze list wurdt dan trochjûn oan de funksje WriteToBigQuery, dy't gewoan ús gegevens taheakket oan 'e tabel. De koade foar Batch DataFlow Job en Streaming DataFlow Job wurdt hjirûnder jûn. It ienige ferskil tusken batch en streaming koade is dat yn batch wy lêze de CSV út src_path
mei help fan de funksje ReadFromText
fan Beam.
Batch DataFlow Job (batchferwurking)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Streaming DataFlow Job (streamferwurking)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Begjin fan de transportband
Wy kinne de pipeline op ferskate ferskillende manieren útfiere. As wy woenen, koenen wy it gewoan lokaal útfiere fan in terminal by it oanmelden by GCP op ôfstân.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Wy sille it lykwols útfiere mei DataFlow. Wy kinne dit dwaan mei it ûnderste kommando troch de folgjende fereaske parameters yn te stellen.
project
- ID fan jo GCP-projekt.runner
is in pipeline runner dy't jo programma sil analysearje en jo pipeline sil konstruearje. Om yn 'e wolk te rinnen, moatte jo in DataflowRunner opjaan.staging_location
- it paad nei de Cloud Dataflow-wolkopslach foar yndeksearjen fan koadepakketten dy't nedich binne troch de processors dy't it wurk útfiere.temp_location
- paad nei de Cloud Dataflow-wolkopslach foar it bewarjen fan tydlike taakbestannen makke wylst de pipeline rint.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Wylst dit kommando rint, kinne wy nei it tabblêd DataFlow yn 'e google-konsole gean en ús pipeline besjen. As wy op 'e pipeline klikke, soene wy wat sjen moatte dat liket op figuer 4. Foar debuggen kin it tige nuttich wêze om nei Logs te gean en dan nei Stackdriver om detaillearre logs te besjen. Dit hat my holpen by it oplossen fan pipelineproblemen yn in oantal gefallen.
figuer 4: Beam conveyor
Tagong ta ús gegevens yn BigQuery
Dat, wy moatte al in pipeline hawwe dy't rint mei gegevens dy't yn ús tabel streame. Om dit te testen, kinne wy nei BigQuery gean en de gegevens besjen. Nei it brûken fan it kommando hjirûnder moatte jo de earste pear rigen fan 'e dataset sjen. No't wy de gegevens hawwe opslein yn BigQuery, kinne wy fierdere analyse útfiere, lykas de gegevens diele mei kollega's en begjinne mei it beantwurdzjen fan saaklike fragen.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
figuer 5: BigQuery
konklúzje
Wy hoopje dat dit berjocht tsjinnet as in nuttich foarbyld fan it meitsjen fan in streaminggegevenspipeline, lykas ek manieren fine om gegevens tagonkliker te meitsjen. It opslaan fan gegevens yn dit formaat jout ús in protte foardielen. No kinne wy begjinne mei it beantwurdzjen fan wichtige fragen lykas hoefolle minsken ús produkt brûke? Groeit jo brûkersbasis oer de tiid? Mei hokker aspekten fan it produkt omgean minsken it meast? En binne d'r flaters wêr't net wêze moatte? Dit binne de fragen dy't ynteressearje sille foar de organisaasje. Op grûn fan de ynsjoggen dy't fuortkomme út 'e antwurden op dizze fragen, kinne wy it produkt ferbetterje en de belutsenens fan brûkers ferheegje.
Beam is echt nuttich foar dit soarte fan oefening en hat ek in oantal oare nijsgjirrige gebrûk gefallen. Jo kinne bygelyks stock tickgegevens yn realtime analysearje en hannelingen meitsje op basis fan 'e analyse, miskien hawwe jo sensorgegevens dy't komme fan auto's en wolle berekkeningen fan ferkearsnivo berekkenje. Jo kinne bygelyks ek in gamingbedriuw wêze dat brûkersgegevens sammelt en brûkt om dashboards te meitsjen om wichtige metriken te folgjen. Okee, hearen, dit is in ûnderwerp foar in oare post, tank foar it lêzen, en foar dyjingen dy't de folsleine koade wolle sjen, hjirûnder is de keppeling nei myn GitHub.
Da's alles.
Boarne: www.habr.com