Hei kaikki. Jaamme artikkelin viimeisen osan käännöksen, joka on valmistettu erityisesti kurssin opiskelijoille.
Apache Beam ja DataFlow reaaliaikaisia putkia varten
Google Cloudin määrittäminen
Huomautus: Käytin Google Cloud Shelliä liukuhihnan suorittamiseen ja mukautettujen lokitietojen julkaisemiseen, koska minulla oli ongelmia liukuhihnan suorittamisessa Python 3:ssa. Google Cloud Shell käyttää Python 2:ta, joka on yhdenmukaisempi Apache Beamin kanssa.
Liukulinjan aloittamiseksi meidän on kaivettava hieman asetuksia. Niiden teistä, jotka eivät ole aiemmin käyttäneet GCP:tä, sinun on noudatettava tässä kuvattuja 6 vaihetta
Tämän jälkeen meidän on ladattava skriptimme Google Cloud Storageen ja kopioitava ne Google Cloud Sheliin. Lataaminen pilvitallennustilaan on melko triviaalia (kuvaus löytyy
Kuvio 2
Alla on lueteltu komennot, joita tarvitsemme tiedostojen kopioimiseen ja tarvittavien kirjastojen asentamiseen.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Tietokannan ja taulukon luominen
Kun olemme suorittaneet kaikki määritykseen liittyvät vaiheet, meidän on seuraavaksi luotava tietojoukko ja taulukko BigQueryssa. On olemassa useita tapoja tehdä tämä, mutta yksinkertaisin on käyttää Google Cloud -konsolia luomalla ensin tietojoukko. Voit seurata alla olevia ohjeita
Kuva 3. Taulukon asettelu
Käyttäjälokitietojen julkaiseminen
Pub/Sub on putkistomme kriittinen osa, koska sen avulla useat itsenäiset sovellukset voivat kommunikoida keskenään. Se toimii erityisesti välittäjänä, jonka avulla voimme lähettää ja vastaanottaa viestejä sovellusten välillä. Ensimmäinen asia, joka meidän on tehtävä, on luoda aihe. Mene vain konsolin Pub/Sub-kohtaan ja napsauta LUO AIHE.
Alla oleva koodi kutsuu komentosarjaamme luomaan yllä määritellyt lokitiedot ja muodostaa sitten yhteyden ja lähettää lokit Pub/Subille. Ainoa asia, joka meidän on tehtävä, on luoda esine PublisherClient, määritä polku aiheeseen menetelmällä topic_path
ja kutsu funktio publish
с topic_path
ja dataa. Huomaa, että tuomme generate_log_line
käsikirjoituksestamme stream_logs
, joten varmista, että nämä tiedostot ovat samassa kansiossa, muuten saat tuontivirheen. Voimme sitten suorittaa tämän Google-konsolimme kautta käyttämällä:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Heti kun tiedosto suoritetaan, voimme nähdä lokitietojen tulosteen konsoliin alla olevan kuvan mukaisesti. Tämä skripti toimii niin kauan kuin emme käytä CTRL + Ctäydentämään sitä.
Kuva 4. Lähtö publish_logs.py
Kirjoitetaan putkistokoodiamme
Nyt kun meillä on kaikki valmiina, voimme aloittaa hauskan osan - putkilinjamme koodaamisen Beamilla ja Pythonilla. Beam-liukuhihnan luomiseksi meidän on luotava liukuhihnaobjekti (p). Kun olemme luoneet liukuhihnaobjektin, voimme käyttää useita toimintoja peräkkäin käyttämällä operaattoria pipe (|)
. Yleisesti ottaen työnkulku näyttää alla olevan kuvan kalta.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Luomme koodissamme kaksi mukautettua funktiota. Toiminto regex_clean
, joka skannaa tiedot ja hakee vastaavan rivin PATTERNS-luettelon perusteella funktion avulla re.search
. Funktio palauttaa pilkuilla erotetun merkkijonon. Jos et ole säännöllisten lausekkeiden asiantuntija, suosittelen tutustumaan tähän datetime
toiminnon sisällä saadaksesi sen toimimaan. Sain tuontivirheen tiedoston alussa, mikä oli outoa. Tämä luettelo välitetään sitten funktiolle WriteToBigQuery, joka yksinkertaisesti lisää tietomme taulukkoon. Batch DataFlow Jobin ja Streaming DataFlow Jobin koodi on annettu alla. Ainoa ero erä- ja suoratoistokoodin välillä on se, että erässä luemme CSV:n src_path
funktiota käyttämällä ReadFromText
Beamista.
Erä DataFlow Job (eräkäsittely)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
DataFlow-työn suoratoisto (streamin käsittely)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Kuljettimen käynnistäminen
Voimme ajaa putkia useilla eri tavoilla. Jos halusimme, voisimme vain ajaa sen paikallisesti päätteestä samalla kun kirjaudumme GCP:hen etänä.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Aiomme kuitenkin suorittaa sen käyttämällä DataFlow'ta. Voimme tehdä tämän käyttämällä alla olevaa komentoa asettamalla seuraavat pakolliset parametrit.
project
— GCP-projektisi tunnus.runner
on putkisto, joka analysoi ohjelmasi ja rakentaa putkilinjasi. Jotta voit suorittaa pilvessä, sinun on määritettävä DataflowRunner.staging_location
— polku Cloud Dataflow -pilvitallennustilaan työn suorittavien prosessorien tarvitsemien koodipakettien indeksointia varten.temp_location
— polku Cloud Dataflow -pilvitallennustilaan liukuhihnan ollessa käynnissä luotujen väliaikaisten työtiedostojen tallentamista varten.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Kun tämä komento on käynnissä, voimme siirtyä Google-konsolin DataFlow-välilehteen ja tarkastella putkistoamme. Kun napsautamme liukuhihnaa, meidän pitäisi nähdä jotain kuvan 4 kaltaista. Viankorjaustarkoituksiin voi olla erittäin hyödyllistä siirtyä lokeihin ja sitten Stackdriveriin tarkastellaksesi yksityiskohtaisia lokeja. Tämä on auttanut minua ratkaisemaan putkien ongelmia useissa tapauksissa.
Kuva 4: Palkkikuljetin
Pääsy tietoihimme BigQueryssa
Meillä pitäisi siis jo olla käynnissä putki, jonka tiedot virtaavat taulukkoomme. Tämän testaamiseksi voimme siirtyä BigQueryyn ja tarkastella tietoja. Alla olevan komennon käytön jälkeen sinun pitäisi nähdä tietojoukon ensimmäiset rivit. Nyt kun data on tallennettu BigQueryyn, voimme tehdä lisäanalyysejä, jakaa tiedot työtovereiden kanssa ja alkaa vastata liiketoimintakysymyksiin.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Kuva 5: BigQuery
Johtopäätös
Toivomme, että tämä viesti toimii hyödyllisenä esimerkkinä suoratoistodatan luomisesta sekä tapojen löytämisestä datan helpottamiseksi. Tietojen tallentaminen tässä muodossa antaa meille monia etuja. Nyt voimme alkaa vastata tärkeisiin kysymyksiin, kuten kuinka moni käyttää tuotettamme? Kasvaako käyttäjäkuntasi ajan myötä? Mihin tuotteen osa-alueisiin ihmiset ovat eniten vuorovaikutuksessa? Ja onko virheitä siellä, missä niitä ei pitäisi olla? Nämä ovat kysymyksiä, jotka kiinnostavat organisaatiota. Näiden kysymysten vastauksista saamien oivallusten perusteella voimme parantaa tuotetta ja lisätä käyttäjien sitoutumista.
Beam on todella hyödyllinen tämäntyyppisessä harjoituksessa ja sillä on myös monia muita mielenkiintoisia käyttötapauksia. Voit esimerkiksi analysoida osakekurssitietoja reaaliajassa ja tehdä kauppoja analyysin perusteella, kenties sinulla on ajoneuvoista tulevaa anturidataa ja haluat laskea liikennetason laskelmia. Voit myös olla esimerkiksi peliyritys, joka kerää käyttäjätietoja ja luo sen avulla kojetauluja tärkeimpien mittareiden seuraamiseksi. Okei, herrat, tämä on toisen postauksen aihe, kiitos lukemisesta, ja niille, jotka haluavat nähdä koko koodin, alla on linkki GitHubiini.
Se on kaikki.
Lähde: will.com