Sziasztok. Megosztjuk a cikk utolsó részének fordítását, amely kifejezetten a kurzus hallgatói számára készült.
Apache Beam és DataFlow valós idejű csővezetékekhez
A Google Cloud beállítása
Megjegyzés: A folyamat futtatásához és az egyéni naplóadatok közzétételéhez a Google Cloud Shell szolgáltatást használtam, mert problémáim voltak a folyamat Python 3-ban való futtatásával. A Google Cloud Shell a Python 2-t használja, amely jobban konzisztens az Apache Beammel.
A csővezeték elindításához egy kicsit bele kell ásnunk a beállításokba. Azok számára, akik korábban nem használták a GCP-t, az alábbi 6 lépést kell követniük
Ezt követően fel kell töltenünk szkriptjeinket a Google Cloud Storage szolgáltatásba, és át kell másolnunk őket a Google Cloud Shelbe. A felhőtárhelyre való feltöltés meglehetősen triviális (leírás található
Ábra 2
Az alábbiakban felsoroljuk azokat a parancsokat, amelyekre szükségünk van a fájlok másolásához és a szükséges könyvtárak telepítéséhez.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Adatbázisunk és táblázatunk elkészítése
Miután elvégeztük az összes beállítással kapcsolatos lépést, a következő dolgunk az, hogy létrehozunk egy adatkészletet és táblázatot a BigQueryben. Ennek többféle módja is van, de a legegyszerűbb a Google Cloud konzol használata az adatkészlet létrehozásával. Kövesse az alábbi lépéseket
3. ábra Táblázat elrendezése
Felhasználói naplóadatok közzététele
A Pub/Sub kritikus összetevője folyamatunknak, mivel lehetővé teszi több független alkalmazás egymás közötti kommunikációját. Különösen közvetítőként működik, amely lehetővé teszi számunkra, hogy üzeneteket küldjünk és fogadjunk az alkalmazások között. Az első dolog, amit tennünk kell, egy téma létrehozása. Egyszerűen lépjen a Pub/Sub elemre a konzolon, és kattintson a TÉMA LÉTREHOZÁSA lehetőségre.
Az alábbi kód meghívja a szkriptünket, hogy létrehozza a fent meghatározott naplóadatokat, majd összekapcsolja és elküldi a naplókat a Pub/Sub-nak. Az egyetlen dolog, amit tennünk kell, egy objektum létrehozása PublisherClient, adja meg a téma elérési útját a metódus segítségével topic_path
és hívja meg a függvényt publish
с topic_path
és adatok. Felhívjuk figyelmét, hogy importálunk generate_log_line
forgatókönyvünkből stream_logs
, ezért ügyeljen arra, hogy ezek a fájlok ugyanabban a mappában legyenek, különben importálási hibaüzenetet kap. Ezt követően lefuttathatjuk ezt a google konzolunkon a következő használatával:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Amint a fájl lefut, láthatjuk a naplóadatok kimenetét a konzolon, ahogy az az alábbi ábrán is látható. Ez a szkript mindaddig működik, amíg nem használjuk CTRL + Cbefejezni.
4. ábra Kimenet publish_logs.py
Csővezeték kódunk írása
Most, hogy mindent előkészítettünk, elkezdhetjük a szórakoztató részt – a folyamat kódolását Beam és Python segítségével. Beam pipeline létrehozásához létre kell hoznunk egy folyamat objektumot (p). Miután létrehoztunk egy pipeline objektumot, az operátor segítségével egymás után több függvényt is alkalmazhatunk pipe (|)
. Általában a munkafolyamat úgy néz ki, mint az alábbi képen.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Kódunkban két egyedi függvényt fogunk létrehozni. Funkció regex_clean
, amely beolvassa az adatokat, és a funkció segítségével lekéri a megfelelő sort a MINTÁK lista alapján re.search
. A függvény vesszővel elválasztott karakterláncot ad vissza. Ha nem vagy reguláris kifejezések szakértője, azt javaslom, hogy nézze meg ezt datetime
egy függvényen belül, hogy működjön. A fájl elején importálási hibát kaptam, ami furcsa volt. Ezt a listát ezután átadja a függvénynek WriteToBigQuery, amely egyszerűen hozzáadja adatainkat a táblázathoz. A Batch DataFlow Job és a Streaming DataFlow Job kódja alább látható. Az egyetlen különbség a kötegelt és a streaming kód között az, hogy kötegben a CSV-t olvassuk ki src_path
funkció használatával ReadFromText
a Beamtől.
Batch DataFlow Job (kötegelt feldolgozás)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Streaming DataFlow feladat (adatfolyam feldolgozás)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
A szállítószalag indítása
A csővezetéket többféleképpen is futtathatjuk. Ha akarjuk, csak helyileg futtathatjuk egy terminálról, miközben távolról bejelentkezünk a GCP-be.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Azonban a DataFlow használatával fogjuk futtatni. Ezt az alábbi paranccsal tehetjük meg a következő szükséges paraméterek beállításával.
project
— GCP-projektjének azonosítója.runner
egy folyamatfutó, amely elemzi a programját, és megszerkeszti a folyamatot. A felhőben való futtatáshoz meg kell adnia egy DataflowRunner-t.staging_location
— a Cloud Dataflow felhőtároló elérési útja a munkát végző processzorok által szükséges kódcsomagok indexeléséhez.temp_location
— a Cloud Dataflow felhőtárhely elérési útja a folyamat közben létrehozott ideiglenes munkafájlok tárolására.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Amíg ez a parancs fut, a google konzol DataFlow lapjára léphetünk, és megtekinthetjük a folyamatunkat. Amikor rákattintunk a folyamatra, valami hasonlót kell látnunk, mint a 4. ábra. Hibakeresési célokra nagyon hasznos lehet a Naplók, majd a Stackdriver megnyitása a részletes naplók megtekintéséhez. Ez számos esetben segített megoldani a csővezeték-problémákat.
4. ábra: Gerenda szállítószalag
Hozzáférhet adatainkhoz a BigQuery szolgáltatásban
Tehát már futnia kell egy folyamatnak, amelyen az adatok a táblánkba áramlanak. Ennek teszteléséhez felkereshetjük a BigQuery-t, és megnézhetjük az adatokat. Az alábbi parancs használata után látnia kell az adatkészlet első néhány sorát. Most, hogy a BigQuery-ben tárolt adatok birtokában vagyunk, további elemzéseket végezhetünk, valamint megoszthatjuk az adatokat a kollégákkal, és elkezdhetünk válaszolni az üzleti kérdésekre.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
5. ábra: BigQuery
Következtetés
Reméljük, hogy ez a bejegyzés hasznos példaként szolgál a streaming adatfolyam létrehozására, valamint arra, hogy módokat találjunk az adatok hozzáférhetőbbé tételére. Az adatok tárolása ebben a formátumban számos előnnyel jár. Most elkezdhetünk választ adni olyan fontos kérdésekre, mint például, hogy hányan használják termékünket? Növekszik a felhasználói bázisa az idő múlásával? A termék mely aspektusaival lépnek kapcsolatba a legtöbbet az emberek? És vannak olyan hibák, ahol nem kellene? Ezek azok a kérdések, amelyek érdekelni fogják a szervezetet. Az ezekre a kérdésekre adott válaszokból származó meglátások alapján javíthatjuk a terméket és növelhetjük a felhasználói elkötelezettséget.
A Beam nagyon hasznos az ilyen típusú gyakorlatokhoz, és számos más érdekes felhasználási esettel is rendelkezik. Például érdemes lehet valós időben elemezni a tőzsdei árfolyamadatokat, és az elemzés alapján kereskedéseket kötni, esetleg vannak járművekből származó szenzoradatok, és forgalomszint számításokat szeretne végezni. Lehetne például egy szerencsejáték-cég is, amely felhasználói adatokat gyűjt, és ezek alapján irányítópultokat hoz létre a legfontosabb mutatók követésére. Oké, uraim, ez egy másik bejegyzés témája, köszönöm, hogy elolvastad, és aki szeretné látni a teljes kódot, az alábbiakban a GitHub-om linkje.
Ez minden.
Forrás: will.com