Helo i gyd. Rydym yn rhannu cyfieithiad o ran olaf yr erthygl, a baratowyd yn benodol ar gyfer myfyrwyr y cwrs.
Apache Beam a DataFlow ar gyfer Piblinellau Amser Real
Sefydlu Google Cloud
Nodyn: Defnyddiais Google Cloud Shell i redeg y biblinell a chyhoeddi data log arferol oherwydd fy mod yn cael trafferth rhedeg y biblinell yn Python 3. Mae Google Cloud Shell yn defnyddio Python 2, sy'n fwy cyson ag Apache Beam.
I ddechrau'r biblinell, mae angen inni gloddio ychydig i'r gosodiadau. I'r rhai ohonoch nad ydych wedi defnyddio GCP o'r blaen, bydd angen i chi ddilyn y 6 cham canlynol a amlinellir yn hwn
Ar Γ΄l hyn, bydd angen i ni uwchlwytho ein sgriptiau i Google Cloud Storage a'u copΓ―o i'n Google Cloud Shel. Mae uwchlwytho i storfa cwmwl yn eithaf dibwys (gellir dod o hyd i ddisgrifiad
Ffigur 2
Mae'r gorchmynion sydd eu hangen arnom i gopΓ―o'r ffeiliau a gosod y llyfrgelloedd gofynnol wedi'u rhestru isod.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Π‘ΠΎΠ·Π΄Π°Π½ΠΈΠ΅ Π½Π°ΡΠ΅ΠΉ Π±Π°Π·Ρ Π΄Π°Π½Π½ΡΡ ΠΈ ΡΠ°Π±Π»ΠΈΡΡ
Unwaith y byddwn wedi cwblhau'r holl gamau sy'n gysylltiedig Γ’ gosod, y peth nesaf y mae angen i ni ei wneud yw creu set ddata a thabl yn BigQuery. Mae yna sawl ffordd o wneud hyn, ond y symlaf yw defnyddio consol Google Cloud trwy greu set ddata yn gyntaf. Gallwch ddilyn y camau isod
Ffigur 3. Cynllun y tabl
Cyhoeddi data log defnyddwyr
Mae Pub/Sub yn elfen hanfodol o'n piblinell oherwydd ei fod yn caniatΓ‘u i gymwysiadau annibynnol lluosog gyfathrebu Γ’'i gilydd. Yn benodol, mae'n gweithio fel cyfryngwr sy'n ein galluogi i anfon a derbyn negeseuon rhwng ceisiadau. Y peth cyntaf sydd angen i ni ei wneud yw creu pwnc. Yn syml, ewch i Pub/Sub yn y consol a chliciwch CREATE TOPIC.
Mae'r cod isod yn galw ein sgript i gynhyrchu'r data log a ddiffinnir uchod ac yna'n cysylltu ac yn anfon y logiau i Pub / Sub. Yr unig beth sydd angen i ni ei wneud yw creu gwrthrych PublisherClient, nodwch y llwybr i'r pwnc gan ddefnyddio'r dull topic_path
ΠΈ Π²ΡΠ·Π²Π°ΡΡ ΡΡΠ½ΠΊΡΠΈΡ publish
Ρ topic_path
a data. Sylwch ein bod yn mewnforio generate_log_line
o'n sgript stream_logs
, felly gwnewch yn siΕ΅r bod y ffeiliau hyn yn yr un ffolder, fel arall fe gewch wall mewnforio. Yna gallwn redeg hwn trwy ein consol google gan ddefnyddio:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Cyn gynted ag y bydd y ffeil yn rhedeg, byddwn yn gallu gweld allbwn y data log i'r consol, fel y dangosir yn y ffigur isod. Bydd y sgript hon yn gweithio cyn belled nad ydym yn ei defnyddio CTRL + Ci'w gwblhau.
Ffigur 4. Allbwn publish_logs.py
Ysgrifennu ein cod piblinell
Nawr bod gennym bopeth wedi'i baratoi, gallwn ddechrau'r rhan hwyliog - codio ein piblinell gan ddefnyddio Beam a Python. I greu piblinell Beam, mae angen i ni greu gwrthrych piblinell (p). Unwaith y byddwn wedi creu gwrthrych piblinell, gallwn gymhwyso swyddogaethau lluosog un ar Γ΄l y llall gan ddefnyddio'r gweithredwr pipe (|)
. Yn gyffredinol, mae'r llif gwaith yn edrych fel y ddelwedd isod.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Yn ein cod, byddwn yn creu dwy swyddogaeth arferiad. Swyddogaeth regex_clean
, sy'n sganio'r data ac yn adfer y rhes gyfatebol yn seiliedig ar y rhestr PATTERNS gan ddefnyddio'r swyddogaeth re.search
. Mae'r ffwythiant yn dychwelyd llinyn wedi'i wahanu gan goma. Os nad ydych chi'n arbenigwr mynegiant rheolaidd, rwy'n argymell gwirio hyn datetime
Π²Π½ΡΡΡΠΈ ΡΡΠ½ΠΊΡΠΈΠΈ, ΡΡΠΎΠ±Ρ ΠΎΠ½Π° ΡΠ°Π±ΠΎΡΠ°Π»Π°. Π― ΠΏΠΎΠ»ΡΡΠ°Π» ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠ΅ ΠΎΠ± ΠΎΡΠΈΠ±ΠΊΠ΅ ΠΏΡΠΈ ΠΈΠΌΠΏΠΎΡΡΠ΅ Π² Π½Π°ΡΠ°Π»Π΅ ΡΠ°ΠΉΠ»Π°, ΡΡΠΎ Π±ΡΠ»ΠΎ ΡΡΡΠ°Π½Π½ΠΎ. ΠΡΠΎΡ ΡΠΏΠΈΡΠΎΠΊ Π·Π°ΡΠ΅ΠΌ ΠΏΠ΅ΡΠ΅Π΄Π°Π΅ΡΡΡ Π² ΡΡΠ½ΠΊΡΠΈΡ WriteToBigQuery, sydd yn syml yn ychwanegu ein data at y tabl. Rhoddir y cod ar gyfer Swp DataFlow Job a Streaming DataFlow Job isod. Yr unig wahaniaeth rhwng swp a chod ffrydio yw ein bod yn darllen y CSV ohono mewn swp src_path
defnyddio'r swyddogaeth ReadFromText
o Beam.
Batch DataFlow Job (ΠΎΠ±ΡΠ°Π±ΠΎΡΠΊΠ° ΠΏΠ°ΠΊΠ΅ΡΠΎΠ²)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Ffrydio DataFlow Job (prosesu ffrwd)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Cychwyn y cludwr
Gallwn redeg y biblinell mewn sawl ffordd wahanol. Pe baem yn dymuno, gallem ei redeg yn lleol o derfynell wrth fewngofnodi i GCP o bell.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Fodd bynnag, rydyn ni'n mynd i'w redeg gan ddefnyddio DataFlow. Gallwn wneud hyn gan ddefnyddio'r gorchymyn isod trwy osod y paramedrau gofynnol canlynol.
project
β ID eich prosiect GCP.runner
yn rhedwr piblinell a fydd yn dadansoddi eich rhaglen ac yn adeiladu eich piblinell. I redeg yn y cwmwl, rhaid i chi nodi DataflowRunner.staging_location
- y llwybr i storfa cwmwl Cloud Dataflow ar gyfer mynegeio pecynnau cod sydd eu hangen ar y proseswyr sy'n cyflawni'r gwaith.temp_location
- llwybr i storfa cwmwl Cloud Dataflow ar gyfer storio ffeiliau swyddi dros dro a grΓ«wyd tra bod y biblinell yn rhedeg.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Tra bod y gorchymyn hwn yn rhedeg, gallwn fynd i'r tab DataFlow yn y consol google a gweld ein piblinell. Pan fyddwn yn clicio ar y biblinell, dylem weld rhywbeth tebyg i Ffigur 4. At ddibenion dadfygio, gall fod yn ddefnyddiol iawn mynd i Logs ac yna i Stackdriver i weld y logiau manwl. Mae hyn wedi fy helpu i ddatrys problemau piblinellau mewn nifer o achosion.
Ffigur 4: Cludwr trawst
Cyrchwch ein data yn BigQuery
Felly, dylai fod gennym eisoes biblinell yn rhedeg gyda data yn llifo i'n tabl. I brofi hyn, gallwn fynd i BigQuery ac edrych ar y data. Ar Γ΄l defnyddio'r gorchymyn isod dylech weld yr ychydig resi cyntaf o'r set ddata. Nawr bod gennym y data wedi'i storio yn BigQuery, gallwn gynnal dadansoddiad pellach, yn ogystal Γ’ rhannu'r data gyda chydweithwyr a dechrau ateb cwestiynau busnes.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Ffigur 5: BigQuery
Casgliad
ΠΠ°Π΄Π΅Π΅ΠΌΡΡ, ΡΡΠΎ ΡΡΠΎΡ ΠΏΠΎΡΡ ΠΏΠΎΡΠ»ΡΠΆΠΈΡ ΠΏΠΎΠ»Π΅Π·Π½ΡΠΌ ΠΏΡΠΈΠΌΠ΅ΡΠΎΠΌ ΡΠΎΠ·Π΄Π°Π½ΠΈΡ ΠΏΠΎΡΠΎΠΊΠΎΠ²ΠΎΠ³ΠΎ ΠΊΠΎΠ½Π²Π΅ΠΉΠ΅ΡΠ° Π΄Π°Π½Π½ΡΡ , Π° ΡΠ°ΠΊΠΆΠ΅ ΠΏΠΎΠΈΡΠΊΠ° ΡΠΏΠΎΡΠΎΠ±ΠΎΠ² ΡΠ΄Π΅Π»Π°ΡΡ Π΄Π°Π½Π½ΡΠ΅ Π±ΠΎΠ»Π΅Π΅ Π΄ΠΎΡΡΡΠΏΠ½ΡΠΌΠΈ. Π₯ΡΠ°Π½Π΅Π½ΠΈΠ΅ Π΄Π°Π½Π½ΡΡ Π² ΡΠ°ΠΊΠΎΠΌ ΡΠΎΡΠΌΠ°ΡΠ΅ Π΄Π°Π΅Ρ Π½Π°ΠΌ ΠΌΠ½ΠΎΠ³ΠΎ ΠΏΡΠ΅ΠΈΠΌΡΡΠ΅ΡΡΠ². Π’Π΅ΠΏΠ΅ΡΡ ΠΌΡ ΠΌΠΎΠΆΠ΅ΠΌ Π½Π°ΡΠ°ΡΡ ΠΎΡΠ²Π΅ΡΠ°ΡΡ Π½Π° Π²Π°ΠΆΠ½ΡΠ΅ Π²ΠΎΠΏΡΠΎΡΡ, Π½Π°ΠΏΡΠΈΠΌΠ΅Ρ, ΡΠΊΠΎΠ»ΡΠΊΠΎ Π»ΡΠ΄Π΅ΠΉ ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΡΡ Π½Π°Ρ ΠΏΡΠΎΠ΄ΡΠΊΡ? Π Π°ΡΡΠ΅Ρ Π»ΠΈ ΡΠΎ Π²ΡΠ΅ΠΌΠ΅Π½Π΅ΠΌ Π±Π°Π·Π° ΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°ΡΠ΅Π»Π΅ΠΉ? Π‘ ΠΊΠ°ΠΊΠΈΠΌΠΈ Π°ΡΠΏΠ΅ΠΊΡΠ°ΠΌΠΈ ΠΏΡΠΎΠ΄ΡΠΊΡΠ° Π»ΡΠ΄ΠΈ Π²Π·Π°ΠΈΠΌΠΎΠ΄Π΅ΠΉΡΡΠ²ΡΡΡ Π±ΠΎΠ»ΡΡΠ΅ Π²ΡΠ΅Π³ΠΎ? Π Π΅ΡΡΡ Π»ΠΈ ΠΎΡΠΈΠ±ΠΊΠΈ, ΡΠ°ΠΌ Π³Π΄Π΅ ΠΈΡ Π±ΡΡΡ Π½Π΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ? ΠΡΠΎ ΡΠ΅ Π²ΠΎΠΏΡΠΎΡΡ, ΠΊΠΎΡΠΎΡΡΠ΅ Π±ΡΠ΄ΡΡ ΠΈΠ½ΡΠ΅ΡΠ΅ΡΠ½Ρ Π΄Π»Ρ ΠΎΡΠ³Π°Π½ΠΈΠ·Π°ΡΠΈΠΈ. ΠΠ° ΠΎΡΠ½ΠΎΠ²Π΅ ΠΈΠ΄Π΅ΠΉ, Π²ΡΡΠ΅ΠΊΠ°ΡΡΠΈΡ ΠΈΠ· ΠΎΡΠ²Π΅ΡΠΎΠ² Π½Π° ΡΡΠΈ Π²ΠΎΠΏΡΠΎΡΡ, ΠΌΡ ΡΠΌΠΎΠΆΠ΅ΠΌ ΡΡΠΎΠ²Π΅ΡΡΠ΅Π½ΡΡΠ²ΠΎΠ²Π°ΡΡ ΠΏΡΠΎΠ΄ΡΠΊΡ ΠΈ ΠΏΠΎΠ²ΡΡΠΈΡΡ Π·Π°ΠΈΠ½ΡΠ΅ΡΠ΅ΡΠΎΠ²Π°Π½Π½ΠΎΡΡΡ ΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°ΡΠ΅Π»Π΅ΠΉ.
Mae Beam yn ddefnyddiol iawn ar gyfer y math hwn o ymarfer corff ac mae ganddo nifer o achosion defnydd diddorol eraill hefyd. Er enghraifft, efallai y byddwch am ddadansoddi data ticio stoc mewn amser real a gwneud crefftau yn seiliedig ar y dadansoddiad, efallai bod gennych ddata synhwyrydd yn dod o gerbydau ac eisiau cyfrifo cyfrifiadau lefel traffig. Gallech hefyd, er enghraifft, fod yn gwmni hapchwarae sy'n casglu data defnyddwyr ac yn ei ddefnyddio i greu dangosfyrddau i olrhain metrigau allweddol. Iawn, foneddigion, mae hwn yn bwnc ar gyfer post arall, diolch am ddarllen, ac i'r rhai sydd am weld y cod llawn, isod mae'r ddolen i fy GitHub.
Dyna i gyd.
Ffynhonnell: hab.com