Баарына салам. Курстун студенттери үчүн атайын даярдалган макаланын жыйынтыктоочу бөлүгүнүн котормосу менен бөлүшөбүз.
Apache Beam жана реалдуу убакыт куурлары үчүн DataFlow
Google Булут орнотулууда
Эскертүү: Мен Google Cloud Shell'ди конвейерди иштетүү жана ыңгайлаштырылган журнал дайындарын жарыялоо үчүн колдондум, анткени Python 3'те конвейерди иштетүүдө кыйынчылыктар болуп жатты. Google Cloud Shell Python 2ди колдонот, ал Apache Beam менен көбүрөөк шайкеш келет.
Түтүктү баштоо үчүн, биз орнотууларды бир аз казып алышыбыз керек. Мурда GCP колдонбогондоруңуз үчүн, анда көрсөтүлгөн төмөнкү 6 кадамды аткарышыңыз керек болот
Андан кийин, биз скрипттерибизди Google Cloud Storage'ге жүктөп, аларды Google Cloud Shel'ибизге көчүрүшүбүз керек. Булуттагы сактагычка жүктөө өтө маанилүү (сүрөттөмөсүн тапса болот
Figure 2
Файлдарды көчүрүү жана керектүү китепканаларды орнотуу үчүн бизге керектүү буйруктар төмөндө келтирилген.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Биздин базаны жана таблицаны түзүү
Орнотууга байланыштуу бардык кадамдарды аткаргандан кийин, биз BigQueryде берилиштер топтомун жана таблицаны түзүшүбүз керек. Муну жасоонун бир нече жолу бар, бирок эң жөнөкөйү – адегенде маалымат топтомун түзүү менен Google Булут консолун колдонуу. Сиз төмөндөгү кадамдарды аткара аласыз
3-сүрөт. Таблица макети
Колдонуучу журналынын маалыматтарын жарыялоо
Pub/Sub биздин куурубуздун маанилүү компоненти болуп саналат, анткени ал бир нече көз карандысыз тиркемелерди бири-бири менен байланышууга мүмкүндүк берет. Атап айтканда, ал бизге тиркемелер ортосунда билдирүүлөрдү жөнөтүүгө жана кабыл алууга мүмкүндүк берген ортомчу катары иштейт. Биринчи кезекте биз тема түзүүбүз керек. Жөн гана консолдогу Pub/Sub бөлүмүнө өтүп, ТЕМА ТҮЗҮҮ дегенди басыңыз.
Төмөнкү код жогоруда аныкталган журнал маалыматтарын түзүү үчүн скриптибизди чакырат, андан кийин журналдарды Pub/Subга туташтырат жана жөнөтөт. Биз эмне кылышыбыз керек болгон бир гана нерсе - объект түзүү PublisherClient, методун колдонуу менен теманын жолун белгилеңиз topic_path
жана функцияны чакырыңыз publish
с topic_path
жана маалыматтар. Биз импорттойбуз generate_log_line
биздин сценарийден stream_logs
, андыктан бул файлдар бир папкада экенин текшериңиз, антпесе сиз импорттоо катасын аласыз. Андан кийин биз муну Google консолубуз аркылуу иштете алабыз:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Файл иштей баштаганда, биз төмөндөгү сүрөттө көрсөтүлгөндөй, консолго журнал маалыматтарынын чыгышын көрө алабыз. Бул скрипт биз колдонбой эле иштей берет CTRL + Cаны аягына чыгаруу үчүн.
Сүрөт 4. Чыгуу publish_logs.py
Биздин түтүк кодун жазуу
Эми биз бардыгын даярдап койгондон кийин, кызыктуу бөлүгүн баштай алабыз - Beam жана Python аркылуу түтүктү коддоо. Beam түтүгүн түзүү үчүн, биз түтүк объектин (р) түзүшүбүз керек. Биз түтүк объектин түзгөндөн кийин, биз операторду колдонуп бир нече функцияларды биринин артынан бири колдоно алабыз pipe (|)
. Жалпысынан алганда, иш процесси төмөндөгү сүрөттө окшош.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Биздин кодубузда биз эки ыңгайлаштырылган функцияны түзөбүз. Функция regex_clean
, бул функцияны колдонуу менен маалыматтарды сканерлейт жана PATTERNS тизмесинин негизинде тиешелүү сапты чыгарат re.search
. Функция үтүр менен бөлүнгөн сапты кайтарат. Эгер сиз кадимки сөз айкашынын адиси болбосоңуз, мен муну текшерүүнү сунуштайм datetime
анын иштеши үчүн функциянын ичинде. Мен файлдын башында импорттоо катасын алып жаткам, бул таң калыштуу болду. Бул тизме андан кийин функцияга өткөрүлүп берилет WriteToBigQuery, бул жөн гана таблицага биздин маалыматтарды кошот. Пакеттик DataFlow жумушунун жана Streaming DataFlow жумушунун коду төмөндө келтирилген. Пакет менен агымдык коддун ортосундагы бир гана айырма - бул партияда биз CSVди окуйбуз src_path
функцияны колдонуу ReadFromText
Beam тартып.
Пакеттик DataFlow жумушу (партиялык иштетүү)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Streaming DataFlow Job (агымдык иштетүү)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Конвейерди ишке киргизүү
Биз куурду бир нече ар кандай жолдор менен иштете алабыз. Эгерде биз кааласак, GCPге алыстан кирип жатканда аны терминалдан локалдык түрдө иштетсек болот.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Бирок, биз аны DataFlow аркылуу иштетебиз. Төмөнкү талап кылынган параметрлерди орнотуу менен төмөнкү буйрукту колдонуу менен муну жасай алабыз.
project
— GCP долбооруңуздун идентификатору.runner
программаңызды талдап, түтүктү кура турган түтүк өткөргүч. Булутта иштетүү үчүн, сиз DataflowRunner көрсөтүшүңүз керек.staging_location
— ишти аткарып жаткан процессорлорго керектүү код пакеттерин индекстөө үчүн Cloud Dataflow булут сактагычына жол.temp_location
— түтүк иштеп жатканда түзүлгөн убактылуу жумуш файлдарын сактоо үчүн Cloud Dataflow булут сактагычына жол.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Бул буйрук иштеп жатканда, биз Google консолундагы DataFlow өтмөгүнө өтүп, биздин түтүктү көрө алабыз. Түтүк өткөргүчтү чыкылдатканыбызда, 4-сүрөткө окшош нерсени көрүшүбүз керек. Мүчүлүштүктөрдү оңдоо максатында, деталдуу журналдарды көрүү үчүн Logs, андан кийин Stackdriver'ге өтүү абдан пайдалуу болушу мүмкүн. Бул мага бир катар учурларда куурдагы көйгөйлөрдү чечүүгө жардам берди.
4-сүрөт: нурлуу конвейер
BigQueryдеги дайындарыбызга кириңиз
Ошентип, биздин таблицага маалыматтар агып турган куурубуз болушу керек. Муну текшерүү үчүн биз BigQuery'ге барып, маалыматтарды карай алабыз. Төмөнкү буйрукту колдонгондон кийин, маалымат топтомунун биринчи бир нече саптарын көрүшүңүз керек. Эми бизде BigQueryде сакталган маалыматтар бар, биз андан ары талдоо жүргүзө алабыз, ошондой эле маалыматтарды кесиптештер менен бөлүшүп, бизнес суроолоруна жооп бере баштайбыз.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
5-сүрөт: BigQuery
жыйынтыктоо
Бул пост агымдык маалымат түтүгүн түзүүнүн, ошондой эле маалыматтарды жеткиликтүү кылуунун жолдорун табууга пайдалуу мисал болот деп үмүттөнөбүз. Бул форматта маалыматтарды сактоо бизге көптөгөн артыкчылыктарды берет. Эми биздин продуктуну канча адам колдонот деген сыяктуу маанилүү суроолорго жооп бере баштайбыз. Сиздин колдонуучу базаңыз убакыттын өтүшү менен өсүп жатабы? Адамдар продуктунун кайсы аспектилери менен көбүрөөк иштешет? Жана болбошу керек жерде каталар барбы? Мына ушул суроолор уюмду кызыктырат. Бул суроолорго жооптордон келип чыккан түшүнүктөрдүн негизинде биз өнүмдү жакшырта алабыз жана колдонуучулардын катышуусун арттыра алабыз.
Beam көнүгүүлөрдүн бул түрү үчүн чынында эле пайдалуу жана башка бир катар кызыктуу колдонуу учурлары да бар. Мисалы, сиз реалдуу убакыт режиминде запастык кенелердин маалыматтарын талдап, анализдин негизинде соода кылгыңыз келиши мүмкүн, балким, сизде транспорт каражаттарынан келген сенсор маалыматтары бар жана трафиктин деңгээлин эсептөөлөрдү эсептегиңиз келет. Ошондой эле, мисалы, колдонуучунун маалыматтарын чогултуучу жана аны негизги көрсөткүчтөрдү көзөмөлдөө үчүн панелдерди түзүү үчүн колдонгон оюн компаниясы болушу мүмкүн. Макул, мырзалар, бул башка посттун темасы, окуганыңыз үчүн рахмат жана толук кодду көрүүнү каалагандар үчүн төмөндө менин GitHub шилтемем бар.
Болгону ушул.
Source: www.habr.com