Zdravo svima. Delimo prevod završnog dela članka, pripremljen posebno za studente kursa.
Apache Beam i DataFlow za cjevovode u realnom vremenu
Postavljanje Google Clouda
Napomena: Koristio sam Google Cloud Shell da pokrenem cjevovod i objavim prilagođene podatke dnevnika jer sam imao problema s pokretanjem cjevovoda u Pythonu 3. Google Cloud Shell koristi Python 2, koji je više konzistentan sa Apache Beamom.
Da bismo pokrenuli cevovod, trebamo malo kopati u postavkama. Za one od vas koji ranije niste koristili GCP, morat ćete slijediti sljedećih 6 koraka navedenih u ovome
Nakon ovoga, morat ćemo otpremiti naše skripte u Google Cloud Storage i kopirati ih u naš Google Cloud Shel. Otpremanje u pohranu u oblaku je prilično trivijalno (možete pronaći opis
2 Figure
Komande koje su nam potrebne za kopiranje datoteka i instaliranje potrebnih biblioteka su navedene u nastavku.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Kreiranje naše baze podataka i tabele
Nakon što završimo sve korake vezane za podešavanje, sljedeća stvar koju trebamo učiniti je kreirati skup podataka i tablicu u BigQueryju. Postoji nekoliko načina da to učinite, ali najjednostavniji je korištenje Google Cloud konzole tako što ćete prvo kreirati skup podataka. Možete slijediti dolje navedene korake
Slika 3. Izgled tabele
Objavljivanje podataka dnevnika korisnika
Pub/Sub je kritična komponenta našeg cevovoda jer omogućava više nezavisnih aplikacija da međusobno komuniciraju. Konkretno, radi kao posrednik koji nam omogućava slanje i primanje poruka između aplikacija. Prvo što treba da uradimo je da kreiramo temu. Jednostavno idite na Pub/Sub na konzoli i kliknite KREIRAJ TEMU.
Kod u nastavku poziva našu skriptu za generiranje podataka dnevnika definiranih gore, a zatim se povezuje i šalje dnevnike u Pub/Sub. Jedino što treba da uradimo je da kreiramo objekat PublisherClient, navedite putanju do teme koristeći metodu topic_path
i pozovite funkciju publish
с topic_path
i podatke. Napominjemo da uvozimo generate_log_line
iz našeg scenarija stream_logs
, pa provjerite jesu li ove datoteke u istom folderu, inače ćete dobiti grešku pri uvozu. Zatim ovo možemo pokrenuti kroz našu google konzolu koristeći:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Čim se datoteka pokrene, moći ćemo vidjeti izlaz podataka dnevnika na konzolu, kao što je prikazano na donjoj slici. Ova skripta će raditi sve dok je ne koristimo CTRL + Cda ga dovršim.
Slika 4. Izlaz publish_logs.py
Pisanje našeg koda za cjevovod
Sada kada smo sve pripremili, možemo početi sa zabavnim dijelom - kodiranjem našeg cjevovoda koristeći Beam i Python. Da bismo kreirali cevovod Beam, moramo kreirati objekat cjevovoda (p). Nakon što smo kreirali objekt cjevovoda, možemo primijeniti više funkcija jednu za drugom koristeći operator pipe (|)
. Općenito, tok posla izgleda kao na slici ispod.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
U našem kodu ćemo kreirati dvije prilagođene funkcije. Funkcija regex_clean
, koji skenira podatke i dohvaća odgovarajući red na osnovu liste PATTERNS koristeći funkciju re.search
. Funkcija vraća string odvojen zarezom. Ako niste stručnjak za regularne izraze, preporučujem da pogledate ovo datetime
unutar funkcije kako bi ona funkcionirala. Dobivao sam grešku pri uvozu na početku datoteke, što je bilo čudno. Ova lista se zatim prosljeđuje funkciji WriteToBigQuery, koji jednostavno dodaje naše podatke u tabelu. Kôd za Batch DataFlow posao i Streaming DataFlow posao je dat ispod. Jedina razlika između paketnog koda i koda za striming je u tome što u paketu čitamo CSV iz src_path
koristeći funkciju ReadFromText
from Beam.
Batch DataFlow posao (skupna obrada)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Streaming DataFlow posao (obrada toka)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Pokretanje cjevovoda
Cjevovod možemo voditi na nekoliko različitih načina. Ako želimo, mogli bismo ga samo pokrenuti lokalno s terminala dok se daljinski prijavljivali na GCP.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Međutim, mi ćemo ga pokrenuti koristeći DataFlow. To možemo učiniti pomoću donje naredbe postavljanjem sljedećih potrebnih parametara.
project
— ID vašeg GCP projekta.runner
je cevovod koji će analizirati vaš program i konstruisati vaš cevovod. Da biste radili u oblaku, morate navesti DataflowRunner.staging_location
— putanja do Cloud Dataflow skladišta u oblaku za indeksiranje paketa kodova potrebnih procesorima koji obavljaju posao.temp_location
— putanja do Cloud Dataflow skladišta u oblaku za pohranjivanje privremenih datoteka poslova kreiranih dok je cjevovod pokrenut.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Dok je ova naredba pokrenuta, možemo otići na karticu DataFlow u google konzoli i vidjeti naš cjevovod. Kada kliknemo na cevovod, trebalo bi da vidimo nešto slično kao na slici 4. Za potrebe otklanjanja grešaka, može biti od velike pomoći da odete na Logs, a zatim na Stackdriver da vidite detaljne evidencije. Ovo mi je pomoglo da riješim probleme s plinovodom u brojnim slučajevima.
Slika 4: Transporter sa gredama
Pristupite našim podacima u BigQueryju
Dakle, već bismo trebali imati cjevovod koji radi s podacima koji teku u našu tabelu. Da bismo ovo testirali, možemo otići na BigQuery i pogledati podatke. Nakon upotrebe naredbe ispod, trebali biste vidjeti prvih nekoliko redova skupa podataka. Sada kada imamo podatke pohranjene u BigQueryju, možemo vršiti dalju analizu, kao i podijeliti podatke sa kolegama i početi odgovarati na poslovna pitanja.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Slika 5: BigQuery
zaključak
Nadamo se da će ovaj post poslužiti kao koristan primjer za kreiranje streaming podataka, kao i za pronalaženje načina da podaci budu dostupniji. Pohranjivanje podataka u ovom formatu daje nam mnoge prednosti. Sada možemo početi odgovarati na važna pitanja poput koliko ljudi koristi naš proizvod? Raste li vaša korisnička baza s vremenom? S kojim aspektima proizvoda ljudi najviše komuniciraju? I ima li grešaka tamo gdje ih ne bi trebalo biti? Ovo su pitanja koja će zanimati organizaciju. Na osnovu uvida koji proizlaze iz odgovora na ova pitanja, možemo poboljšati proizvod i povećati angažman korisnika.
Beam je zaista koristan za ovu vrstu vježbe, a ima i niz drugih zanimljivih slučajeva korištenja. Na primjer, možda ćete htjeti analizirati podatke o zalihama u realnom vremenu i trgovati na osnovu analize, možda imate podatke senzora koji dolaze iz vozila i želite izračunati kalkulacije nivoa prometa. Možete, na primjer, biti i kompanija za igre koja prikuplja korisničke podatke i koristi ih za kreiranje nadzornih ploča za praćenje ključnih metrika. Dobro, gospodo, ovo je tema za još jedan post, hvala na čitanju, a za one koji žele da vide ceo kod, ispod je link na moj GitHub.
To je sve.
izvor: www.habr.com