Бәріңе сәлем. Біз курс студенттері үшін арнайы дайындалған мақаланың қорытынды бөлімінің аудармасымен бөлісеміз.
Нақты уақыттағы құбыр желілеріне арналған Apache Beam және DataFlow
Google Cloud орнатылуда
Ескертпе: Мен құбырды іске қосу және реттелетін журнал деректерін жариялау үшін Google Cloud Shell қолданбасын пайдаландым, себебі Python 3 жүйесінде құбырды іске қосуда қиындықтар туындады. Google Cloud Shell Apache Beam жүйесіне сәйкес келетін Python 2 нұсқасын пайдаланады.
Құбырды бастау үшін параметрлерді аздап қазып алу керек. Бұрын GCP қолданбағандарыңыз үшін осында көрсетілген келесі 6 қадамды орындауыңыз қажет
Осыдан кейін біз сценарийлерді Google Cloud Storage жүйесіне жүктеп салып, оларды Google Cloud Shel жүйесіне көшіруіміз керек. Бұлттық қоймаға жүктеп салу өте маңызды емес (сипаттаманы табуға болады
Сурет 2
Файлдарды көшіру және қажетті кітапханаларды орнату үшін бізге қажет командалар төменде берілген.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Деректер базасын және кестемізді құру
Орнатуға қатысты барлық қадамдарды орындағаннан кейін, бізге қажет келесі нәрсе - BigQuery-де деректер жинағы мен кестені жасау. Мұны істеудің бірнеше жолы бар, бірақ ең қарапайымы - алдымен деректер жинағын жасау арқылы Google Cloud консолін пайдалану. Төмендегі қадамдарды орындауға болады
Сурет 3. Кестенің орналасуы
Пайдаланушы журналының деректерін жариялау
Pub/Sub - біздің құбырымыздың маңызды құрамдас бөлігі, себебі ол бірнеше тәуелсіз қолданбалардың бір-бірімен байланысуына мүмкіндік береді. Атап айтқанда, ол қосымшалар арасында хабарламаларды жіберуге және алуға мүмкіндік беретін делдал ретінде жұмыс істейді. Бізге бірінші кезекте тақырып құру керек. Консольдегі Pub/Sub бөліміне өтіп, ТАҚЫРЫП ЖАСАУ түймесін басыңыз.
Төмендегі код жоғарыда анықталған журнал деректерін жасау үшін сценарийді шақырады, содан кейін журналдарды Pub/Sub қызметіне қосады және жібереді. Бізге тек объект жасау керек PublisherClient, әдісі арқылы тақырыпқа жолды көрсетіңіз topic_path
және функцияны шақырыңыз publish
с topic_path
және деректер. Біз импорттайтынымызды ескеріңіз generate_log_line
біздің сценарийден stream_logs
, сондықтан бұл файлдардың бір қалтада екеніне көз жеткізіңіз, әйтпесе импорттау қатесін аласыз. Содан кейін біз оны Google консолі арқылы іске қоса аламыз:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Файл іске қосылғаннан кейін біз төмендегі суретте көрсетілгендей журнал деректерінің консольге шығуын көре аламыз. Бұл сценарий біз пайдаланбағанша жұмыс істейді CTRL + Cоны аяқтау үшін.
Сурет 4. Шығару publish_logs.py
Біздің құбыр кодын жазу
Енді бізде барлығы дайын, біз қызықты бөлікті - Beam және Python көмегімен құбырды кодтауды бастай аламыз. Beam құбырын жасау үшін құбыр нысанын (p) жасау керек. Біз конвейер нысанын жасағаннан кейін операторды пайдаланып бірнеше функцияларды бірінен соң бірін қолдана аламыз pipe (|)
. Жалпы, жұмыс процесі төмендегі суретке ұқсайды.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Біздің кодта біз екі теңшелетін функцияны жасаймыз. Функция regex_clean
, ол деректерді сканерлейді және функцияны пайдаланып PATTERNS тізімі негізінде сәйкес жолды шығарады re.search
. Функция үтірмен бөлінген жолды қайтарады. Егер сіз тұрақты өрнек маманы болмасаңыз, мен мұны тексеруді ұсынамын datetime
оны жұмыс істеу үшін функцияның ішінде. Мен файлдың басында импорттау қатесін алдым, бұл біртүрлі болды. Содан кейін бұл тізім функцияға беріледі WriteToBigQuery, бұл біздің деректерімізді кестеге жай ғана қосады. Пакеттік DataFlow тапсырмасы және Streaming DataFlow тапсырмасының коды төменде берілген. Пакет пен ағындық кодтың арасындағы жалғыз айырмашылық - бұл пакетте біз CSV файлын оқимыз src_path
функциясын пайдалану ReadFromText
Beam бастап.
Пакеттік DataFlow тапсырмасы (пакетті өңдеу)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Streaming DataFlow Job (ағынды өңдеу)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Конвейерді іске қосу
Біз құбырды бірнеше түрлі жолмен жүргізе аламыз. Қаласақ, GCP жүйесіне қашықтан кіру кезінде оны терминалдан жергілікті түрде іске қоса аламыз.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Дегенмен, біз оны DataFlow арқылы іске қосамыз. Біз мұны төмендегі пәрменді пайдаланып, келесі қажетті параметрлерді орнату арқылы жасай аламыз.
project
— GCP жобаңыздың идентификаторы.runner
сіздің бағдарламаңызды талдайтын және құбырды құрастыратын құбыр жолшысы. Бұлтта іске қосу үшін DataflowRunner көрсету керек.staging_location
— жұмысты орындайтын процессорларға қажет код пакеттерін индекстеу үшін Cloud Dataflow бұлттық қоймасына апаратын жол.temp_location
— құбыр жұмыс істеп тұрған кезде жасалған уақытша жұмыс файлдарын сақтауға арналған Cloud Dataflow бұлттық қоймасына апаратын жол.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Бұл пәрмен жұмыс істеп тұрғанда, біз Google консоліндегі DataFlow қойындысына өтіп, құбырымызды көре аламыз. Құбырды басқан кезде біз 4-суретке ұқсас нәрсені көруіміз керек. Түзету мақсатында егжей-тегжейлі журналдарды көру үшін Журналдар, содан кейін Stackdriver бөліміне өту өте пайдалы болуы мүмкін. Бұл маған бірқатар жағдайларда құбыр мәселелерін шешуге көмектесті.
4-сурет: Арқалық конвейер
BigQuery ішіндегі деректерімізге қол жеткізіңіз
Сонымен, бізде кестеге деректер ағынымен жұмыс істейтін құбыр болуы керек. Мұны тексеру үшін BigQuery-ге өтіп, деректерді қарауға болады. Төмендегі пәрменді пайдаланғаннан кейін деректер жиынының алғашқы бірнеше жолын көруіңіз керек. Енді бізде BigQuery-де сақталған деректер бар, біз әрі қарай талдау жасай аламыз, сонымен қатар деректерді әріптестермен бөлісіп, бизнес сұрақтарына жауап бере аламыз.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
5-сурет: BigQuery
қорытынды
Бұл пост ағынды деректер құбырын құрудың, сондай-ақ деректерді қол жетімді ету жолдарын табудың пайдалы үлгісі болады деп үміттенеміз. Бұл форматта деректерді сақтау бізге көптеген артықшылықтар береді. Енді біздің өнімді қанша адам пайдаланады сияқты маңызды сұрақтарға жауап беруге болады. Сіздің пайдаланушы базаңыз уақыт өте келе өсіп жатыр ма? Адамдар өнімнің қандай аспектілерімен көбірек әрекеттеседі? Ал болмауы керек жерде қателер бар ма? Бұл ұйымды қызықтыратын сұрақтар. Осы сұрақтардың жауаптарынан алынған түсініктерге сүйене отырып, біз өнімді жақсарта аламыз және пайдаланушылардың қатысуын арттыра аламыз.
Beam жаттығудың бұл түрі үшін өте пайдалы және басқа да қызықты қолдану жағдайлары бар. Мысалы, сіз нақты уақыт режимінде акция деректерін талдап, талдау негізінде сауда жасағыңыз келуі мүмкін, мүмкін сізде көліктерден келетін сенсор деректері бар және трафик деңгейінің есептеулерін есептегіңіз келеді. Сондай-ақ, мысалы, пайдаланушы деректерін жинайтын және оны негізгі көрсеткіштерді бақылау үшін бақылау тақталарын жасау үшін пайдаланатын ойын компаниясы бола аласыз. Жарайды, мырзалар, бұл басқа посттың тақырыбы, оқығаныңыз үшін рахмет және толық кодты көргісі келетіндер үшін төменде менің GitHub сілтемесі бар.
Мұның бәрі.
Ақпарат көзі: www.habr.com