Բարեւ բոլորին. Կիսվում ենք հոդվածի վերջին մասի թարգմանությամբ՝ պատրաստված հատուկ դասընթացի ուսանողների համար։
Apache Beam և DataFlow իրական ժամանակի խողովակաշարերի համար
Google Cloud-ի կարգավորում
Նշում. ես օգտագործել եմ Google Cloud Shell-ը խողովակաշարը գործարկելու և հատուկ տեղեկամատյանների տվյալները հրապարակելու համար, քանի որ խնդիրներ էի ունենում Python 3-ում խողովակաշարը վարելու համար: Google Cloud Shell-ն օգտագործում է Python 2, որն ավելի համահունչ է Apache Beam-ին:
Խողովակաշարը սկսելու համար մենք պետք է մի փոքր փորփրենք պարամետրերը: Ձեզանից նրանց համար, ովքեր նախկինում չեն օգտագործել GCP-ն, դուք պետք է հետևեք հետևյալ 6 քայլերին, որոնք նկարագրված են դրանում
Դրանից հետո մենք պետք է վերբեռնենք մեր սկրիպտները Google Cloud Storage-ում և պատճենենք դրանք մեր Google Cloud Shel-ում: Ամպային պահեստում վերբեռնումը բավականին աննշան է (կարելի է գտնել նկարագրությունը
Նկար 2
Ֆայլերը պատճենելու և անհրաժեշտ գրադարանները տեղադրելու համար մեզ անհրաժեշտ հրամանները ներկայացված են ստորև:
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Մեր տվյալների բազայի և աղյուսակի ստեղծում
Երբ մենք ավարտենք կարգավորումների հետ կապված բոլոր քայլերը, հաջորդ բանը, որ մենք պետք է անենք, BigQuery-ում տվյալների բազա և աղյուսակ ստեղծելն է: Դա անելու մի քանի եղանակ կա, բայց ամենապարզը Google Cloud կոնսոլից օգտվելն է՝ նախ ստեղծելով տվյալների բազա: Դուք կարող եք հետևել ստորև նշված քայլերին
Նկար 3. Սեղանի դասավորությունը
Օգտագործողի մատյանների տվյալների հրապարակում
Pub/Sub-ը մեր խողովակաշարի կարևոր բաղադրիչն է, քանի որ այն թույլ է տալիս մի քանի անկախ հավելվածների հաղորդակցվել միմյանց հետ: Մասնավորապես, այն աշխատում է որպես միջնորդ, որը թույլ է տալիս մեզ ուղարկել և ստանալ հաղորդագրություններ հավելվածների միջև: Առաջին բանը, որ մենք պետք է անենք, թեմա ստեղծելն է: Պարզապես գնացեք Pub/Sub կոնսոլում և սեղմեք «ՍՏԵՂԾԵԼ ԹԵՄԱ»:
Ստորև բերված կոդը կանչում է մեր սկրիպտը՝ ստեղծելու վերևում սահմանված մատյան տվյալները, այնուհետև միացնում և ուղարկում է տեղեկամատյանները Pub/Sub-ին: Միակ բանը, որ մենք պետք է անենք, օբյեկտ ստեղծելն է PublisherClient, մեթոդով նշեք թեմայի ուղին topic_path
և զանգահարեք ֆունկցիան publish
с topic_path
և տվյալներ։ Խնդրում ենք նկատի ունենալ, որ մենք ներմուծում ենք generate_log_line
մեր սցենարից stream_logs
, այնպես որ համոզվեք, որ այս ֆայլերը գտնվում են նույն թղթապանակում, հակառակ դեպքում դուք ներմուծման սխալ կստանաք: Այնուհետև մենք կարող ենք սա գործարկել մեր Google վահանակի միջոցով՝ օգտագործելով.
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Հենց որ ֆայլը գործարկվի, մենք կկարողանանք տեսնել տեղեկամատյանների տվյալների ելքը դեպի վահանակ, ինչպես ցույց է տրված ստորև նկարում: Այս սցենարը կաշխատի այնքան ժամանակ, քանի դեռ մենք չենք օգտագործում CTRL + Cայն ավարտելու համար:
Նկար 4. Արդյունք publish_logs.py
Գրելով մեր խողովակաշարի կոդը
Այժմ, երբ մենք ամեն ինչ պատրաստ ենք, կարող ենք սկսել զվարճալի մասը՝ մեր խողովակաշարի կոդավորումը Beam-ի և Python-ի միջոցով: Beam խողովակաշար ստեղծելու համար մենք պետք է ստեղծենք խողովակաշարի օբյեկտ (p): Երբ մենք ստեղծենք խողովակաշարի օբյեկտ, մենք կարող ենք կիրառել բազմաթիվ գործառույթներ մեկը մյուսի հետևից՝ օգտագործելով օպերատորը pipe (|)
. Ընդհանուր առմամբ, աշխատանքային հոսքը նման է ստորև ներկայացված պատկերին:
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Մեր կոդում մենք կստեղծենք երկու հատուկ գործառույթ: Գործառույթ regex_clean
, որը սկանավորում է տվյալները և առբերում համապատասխան տողը PATTERNS ցուցակի հիման վրա՝ օգտագործելով ֆունկցիան re.search
. Ֆունկցիան վերադարձնում է ստորակետերով առանձնացված տող: Եթե դուք սովորական արտահայտման մասնագետ չեք, խորհուրդ եմ տալիս ստուգել սա datetime
ֆունկցիայի ներսում, որպեսզի այն աշխատի: Ես ֆայլի սկզբում ներմուծման սխալ էի ստանում, ինչը տարօրինակ էր: Այս ցուցակն այնուհետև փոխանցվում է ֆունկցիային WriteToBigQuery, որը պարզապես ավելացնում է մեր տվյալները աղյուսակում: Batch DataFlow Job-ի և Streaming DataFlow Job-ի կոդը տրված է ստորև: Փաթեթի և հոսքային կոդի միջև միակ տարբերությունն այն է, որ խմբաքանակում մենք կարդում ենք CSV-ն src_path
օգտագործելով գործառույթը ReadFromText
Beam-ից:
Batch DataFlow Job (խմբաքանակի մշակում)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Streaming DataFlow Job (հոսքի մշակում)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Փոխակրիչի գործարկում
Մենք կարող ենք խողովակաշարը վարել մի քանի տարբեր ձևերով: Եթե ցանկանայինք, մենք պարզապես կարող էինք այն գործարկել լոկալ տերմինալից՝ հեռակա կարգով GCP մուտք գործելիս:
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Այնուամենայնիվ, մենք պատրաստվում ենք այն գործարկել DataFlow-ի միջոցով: Մենք կարող ենք դա անել՝ օգտագործելով ստորև նշված հրամանը՝ սահմանելով հետևյալ պահանջվող պարամետրերը:
project
— Ձեր GCP նախագծի ID-ն:runner
խողովակաշար է, որը կվերլուծի ձեր ծրագիրը և կկառուցի ձեր խողովակաշարը: Ամպում աշխատելու համար դուք պետք է նշեք DataflowRunner:staging_location
— ճանապարհը դեպի Cloud Dataflow ամպային պահեստ՝ աշխատանքը կատարող պրոցեսորների կողմից անհրաժեշտ կոդային փաթեթների ինդեքսավորման համար:temp_location
— ճանապարհ դեպի Cloud Dataflow ամպային պահեստ՝ խողովակաշարի գործարկման ընթացքում ստեղծված աշխատանքային ժամանակավոր ֆայլերը պահելու համար:streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Մինչ այս հրամանն աշխատում է, մենք կարող ենք գնալ google վահանակի DataFlow ներդիր և դիտել մեր խողովակաշարը: Երբ մենք սեղմում ենք խողովակաշարի վրա, մենք պետք է տեսնենք մի բան, որը նման է Նկար 4-ին: Վրիպազերծման նպատակների համար կարող է շատ օգտակար լինել գնալ Logs, այնուհետև Stackdriver՝ մանրամասն տեղեկամատյանները դիտելու համար: Սա օգնել է ինձ լուծել խողովակաշարի խնդիրները մի շարք դեպքերում:
Նկար 4. Ճառագայթային փոխակրիչ
Մուտք գործեք մեր տվյալները BigQuery-ում
Այսպիսով, մենք արդեն պետք է ունենանք խողովակաշար, որն աշխատում է մեր աղյուսակի մեջ հոսող տվյալների հետ: Սա փորձարկելու համար մենք կարող ենք գնալ BigQuery և նայել տվյալները: Ստորև նշված հրամանն օգտագործելուց հետո դուք պետք է տեսնեք տվյալների հավաքածուի առաջին մի քանի տողերը: Այժմ, երբ մենք ունենք BigQuery-ում պահվող տվյալները, մենք կարող ենք հետագա վերլուծություններ կատարել, ինչպես նաև կիսվել տվյալները գործընկերների հետ և սկսել պատասխանել բիզնեսի հարցերին:
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Նկար 5. BigQuery
Ամփոփում
Հուսով ենք, որ այս գրառումը օգտակար օրինակ կծառայի հոսքային տվյալների խողովակաշար ստեղծելու, ինչպես նաև տվյալների ավելի մատչելի դարձնելու ուղիներ գտնելու համար: Այս ձևաչափով տվյալների պահպանումը մեզ շատ առավելություններ է տալիս: Այժմ մենք կարող ենք սկսել պատասխանել այնպիսի կարևոր հարցերի, ինչպիսիք են, թե քանի մարդ է օգտվում մեր արտադրանքից: Ձեր օգտատերերի բազան ժամանակի ընթացքում աճու՞մ է: Ապրանքի ո՞ր ասպեկտների հետ են մարդիկ ամենաշատը շփվում: Եվ կա՞ն սխալներ, որտեղ չպետք է լինեն: Սրանք այն հարցերն են, որոնք կհետաքրքրեն կազմակերպությանը։ Հիմնվելով այս հարցերի պատասխաններից ստացված պատկերացումների վրա՝ մենք կարող ենք բարելավել արտադրանքը և ավելացնել օգտատերերի ներգրավվածությունը:
Beam-ը իսկապես օգտակար է այս տեսակի վարժությունների համար և ունի նաև մի շարք այլ հետաքրքիր օգտագործման դեպքեր: Օրինակ, դուք կարող եք վերլուծել բաժնետոմսերի տիզերի տվյալները իրական ժամանակում և գործարքներ կատարել վերլուծության հիման վրա, գուցե դուք ունեք սենսորային տվյալներ, որոնք գալիս են տրանսպորտային միջոցներից և ցանկանում եք հաշվարկել երթևեկության մակարդակի հաշվարկները: Դուք կարող եք նաև լինել, օրինակ, խաղային ընկերություն, որը հավաքում է օգտատերերի տվյալները և օգտագործում դրանք՝ ստեղծելու վահանակներ՝ հիմնական չափումները հետևելու համար: Լավ, պարոնայք, սա հերթական գրառման թեմա է, շնորհակալություն կարդալու համար, իսկ նրանց համար, ովքեր ցանկանում են տեսնել ամբողջական կոդը, ստորև ներկայացնում ենք իմ GitHub-ի հղումը:
Դա բոլորն են:
Source: www.habr.com