Bok svima. Dijelimo prijevod završnog dijela članka, pripremljen posebno za studente kolegija.
Apache Beam i DataFlow za cjevovode u stvarnom vremenu
Postavljanje Google Clouda
Napomena: koristio sam Google Cloud Shell za pokretanje cjevovoda i objavljivanje prilagođenih podataka dnevnika jer sam imao problema s pokretanjem cjevovoda u Pythonu 3. Google Cloud Shell koristi Python 2, koji je dosljedniji s Apache Beamom.
Da bismo pokrenuli cjevovod, moramo malo kopati po postavkama. Za one od vas koji prije nisu koristili GCP, morat ćete slijediti sljedećih 6 koraka opisanih u ovom
Nakon toga, morat ćemo učitati naše skripte u Google Cloud Storage i kopirati ih u naš Google Cloud Shel. Prijenos u pohranu u oblaku prilično je trivijalan (možete pronaći opis
Slika 2
Naredbe koje su nam potrebne za kopiranje datoteka i instaliranje potrebnih biblioteka navedene su u nastavku.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Izrada naše baze podataka i tablice
Nakon što smo dovršili sve korake povezane s postavljanjem, sljedeća stvar koju trebamo učiniti je stvoriti skup podataka i tablicu u BigQueryju. Postoji nekoliko načina da to učinite, ali najjednostavniji je koristiti Google Cloud konzolu tako da prvo napravite skup podataka. Možete slijediti korake u nastavku
Slika 3. Izgled tablice
Objavljivanje podataka korisničkog dnevnika
Pub/Sub je kritična komponenta našeg cjevovoda jer omogućuje međusobnu komunikaciju više neovisnih aplikacija. Točnije, radi kao posrednik koji nam omogućuje slanje i primanje poruka između aplikacija. Prvo što trebamo učiniti je stvoriti temu. Jednostavno idite na Pub/Sub u konzoli i kliknite STVORI TEMU.
Kôd u nastavku poziva našu skriptu za generiranje gore definiranih podataka zapisnika, a zatim se povezuje i šalje zapisnike u Pub/Sub. Jedina stvar koju trebamo učiniti je stvoriti objekt PublisherClient, odredite put do teme pomoću metode topic_path
i pozvati funkciju publish
с topic_path
i podataka. Napominjemo da uvozimo generate_log_line
iz našeg scenarija stream_logs
, stoga provjerite jesu li ove datoteke u istoj mapi, inače ćete dobiti pogrešku pri uvozu. Zatim to možemo pokrenuti putem naše google konzole koristeći:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Čim se datoteka pokrene, moći ćemo vidjeti izlaz podataka dnevnika na konzolu, kao što je prikazano na slici ispod. Ova skripta će raditi sve dok je ne koristimo CTRL + Cda ga dovršim.
Slika 4. Izlaz publish_logs.py
Pisanje našeg koda cjevovoda
Sada kada smo sve pripremili, možemo započeti zabavni dio - kodiranje našeg cjevovoda pomoću Beama i Pythona. Da bismo stvorili Beam cjevovod, moramo stvoriti objekt cjevovoda (p). Nakon što smo stvorili objekt cjevovoda, možemo primijeniti više funkcija jednu za drugom pomoću operatora pipe (|)
. Općenito, tijek rada izgleda kao na slici ispod.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
U našem kodu stvorit ćemo dvije prilagođene funkcije. Funkcija regex_clean
, koji skenira podatke i dohvaća odgovarajući red na temelju popisa PATTERNS pomoću funkcije re.search
. Funkcija vraća niz odvojen zarezom. Ako niste stručnjak za regularne izraze, preporučujem da pogledate ovo datetime
unutar funkcije kako bi ona radila. Dobivao sam pogrešku pri uvozu na početku datoteke, što je bilo čudno. Ovaj popis se zatim prosljeđuje funkciji WriteToBigQuery, koji jednostavno dodaje naše podatke u tablicu. Kôd za Batch DataFlow Job i Streaming DataFlow Job naveden je u nastavku. Jedina razlika između batch i streaming koda je da u batchu čitamo CSV src_path
koristeći funkciju ReadFromText
od Beama.
Batch DataFlow Job (skupna obrada)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Streaming DataFlow Job (stream obrada)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Pokretanje pokretne trake
Cjevovod možemo pokrenuti na nekoliko različitih načina. Da smo htjeli, mogli bismo ga jednostavno pokrenuti lokalno s terminala dok se daljinski prijavljujemo na GCP.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Međutim, mi ćemo ga pokrenuti koristeći DataFlow. To možemo učiniti pomoću donje naredbe postavljanjem sljedećih potrebnih parametara.
project
— ID vašeg GCP projekta.runner
je cjevovod koji će analizirati vaš program i konstruirati vaš cjevovod. Za rad u oblaku morate navesti DataflowRunner.staging_location
— put do pohrane u oblaku Cloud Dataflow za indeksiranje paketa kodova potrebnih procesorima koji obavljaju posao.temp_location
— put do Cloud Dataflow pohrane u oblaku za pohranu privremenih datoteka poslova stvorenih dok cjevovod radi.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Dok se ova naredba izvodi, možemo otići na karticu DataFlow na google konzoli i vidjeti naš cjevovod. Kada kliknemo na cjevovod, trebali bismo vidjeti nešto slično kao na slici 4. Za potrebe otklanjanja pogrešaka, može biti od velike pomoći otići na Dnevnike, a zatim na Stackdriver za pregled detaljnih zapisnika. To mi je pomoglo u rješavanju problema s cjevovodom u brojnim slučajevima.
Slika 4: Transportna greda
Pristupite našim podacima u BigQueryju
Dakle, već bismo trebali imati pokrenut cjevovod s podacima koji teku u našu tablicu. Da bismo ovo testirali, možemo otići na BigQuery i pogledati podatke. Nakon korištenja donje naredbe trebali biste vidjeti prvih nekoliko redaka skupa podataka. Sada kada imamo podatke pohranjene u BigQueryju, možemo provesti daljnju analizu, kao i podijeliti podatke s kolegama i početi odgovarati na poslovna pitanja.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Slika 5: BigQuery
Zaključak
Nadamo se da će ovaj post poslužiti kao koristan primjer stvaranja cjevovoda za strujanje podataka, kao i pronalaženja načina da podaci budu dostupniji. Pohranjivanje podataka u ovom formatu daje nam mnoge prednosti. Sada možemo početi odgovarati na važna pitanja kao što je koliko ljudi koristi naš proizvod? Raste li vaša baza korisnika s vremenom? S kojim aspektima proizvoda ljudi najviše komuniciraju? A ima li grešaka tamo gdje ih ne bi trebalo biti? To su pitanja koja će biti od interesa za organizaciju. Na temelju spoznaja koje proizlaze iz odgovora na ova pitanja, možemo poboljšati proizvod i povećati angažman korisnika.
Beam je stvarno koristan za ovu vrstu vježbi, a ima i niz drugih zanimljivih slučajeva upotrebe. Na primjer, možda želite analizirati podatke o dionicama u stvarnom vremenu i trgovati na temelju analize, možda imate podatke senzora koji dolaze iz vozila i želite izračunati izračune razine prometa. Možete također, na primjer, biti tvrtka za igre na sreću koja prikuplja korisničke podatke i koristi ih za izradu nadzornih ploča za praćenje ključnih metrika. U redu, gospodo, ovo je tema za drugi post, hvala na čitanju, a za one koji žele vidjeti cijeli kod, ispod je poveznica na moj GitHub.
To je sve.
Izvor: www.habr.com