Биз маалымат агымын иштетүү түтүгүн түзөбүз. 2-бөлүк

Баарына салам. Курстун студенттери үчүн атайын даярдалган макаланын жыйынтыктоочу бөлүгүнүн котормосу менен бөлүшөбүз. Маалымат инженери. Биринчи бөлүгүн окуй аласыз бул жерде.

Apache Beam жана реалдуу убакыт куурлары үчүн DataFlow

Биз маалымат агымын иштетүү түтүгүн түзөбүз. 2-бөлүк

Google Булут орнотулууда

Эскертүү: Мен Google Cloud Shell'ди конвейерди иштетүү жана ыңгайлаштырылган журнал дайындарын жарыялоо үчүн колдондум, анткени Python 3'те конвейерди иштетүүдө кыйынчылыктар болуп жатты. Google Cloud Shell Python 2ди колдонот, ал Apache Beam менен көбүрөөк шайкеш келет.

Түтүктү баштоо үчүн, биз орнотууларды бир аз казып алышыбыз керек. Мурда GCP колдонбогондоруңуз үчүн, анда көрсөтүлгөн төмөнкү 6 кадамды аткарышыңыз керек болот бет.

Андан кийин, биз скрипттерибизди Google Cloud Storage'ге жүктөп, аларды Google Cloud Shel'ибизге көчүрүшүбүз керек. Булуттагы сактагычка жүктөө өтө маанилүү (сүрөттөмөсүн тапса болот бул жерде). Файлдарыбызды көчүрүү үчүн төмөнкү 2-сүрөттөгү сол жактагы биринчи сөлөкөтүн чыкылдатуу менен куралдар тактасынан Google Cloud Shel ача алабыз.

Биз маалымат агымын иштетүү түтүгүн түзөбүз. 2-бөлүк
Figure 2

Файлдарды көчүрүү жана керектүү китепканаларды орнотуу үчүн бизге керектүү буйруктар төмөндө келтирилген.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Биздин базаны жана таблицаны түзүү

Орнотууга байланыштуу бардык кадамдарды аткаргандан кийин, биз BigQueryде берилиштер топтомун жана таблицаны түзүшүбүз керек. Муну жасоонун бир нече жолу бар, бирок эң жөнөкөйү – адегенде маалымат топтомун түзүү менен Google Булут консолун колдонуу. Сиз төмөндөгү кадамдарды аткара аласыз байланышсхема менен таблица түзүү. Биздин дасторкон болот 7 тилке, ар бир колдонуучу журналынын компоненттерине туура келет. Ыңгайлуу болуу үчүн, биз timelocal өзгөрмөдөн башка бардык мамычаларды саптар катары аныктайбыз жана аларды мурда түзүлгөн өзгөрмөлөргө ылайык атайбыз. Биздин столдун макети 3-сүрөттөгүдөй болушу керек.

Биз маалымат агымын иштетүү түтүгүн түзөбүз. 2-бөлүк
3-сүрөт. Таблица макети

Колдонуучу журналынын маалыматтарын жарыялоо

Pub/Sub биздин куурубуздун маанилүү компоненти болуп саналат, анткени ал бир нече көз карандысыз тиркемелерди бири-бири менен байланышууга мүмкүндүк берет. Атап айтканда, ал бизге тиркемелер ортосунда билдирүүлөрдү жөнөтүүгө жана кабыл алууга мүмкүндүк берген ортомчу катары иштейт. Биринчи кезекте биз тема түзүүбүз керек. Жөн гана консолдогу Pub/Sub бөлүмүнө өтүп, ТЕМА ТҮЗҮҮ дегенди басыңыз.

Төмөнкү код жогоруда аныкталган журнал маалыматтарын түзүү үчүн скриптибизди чакырат, андан кийин журналдарды Pub/Subга туташтырат жана жөнөтөт. Биз эмне кылышыбыз керек болгон бир гана нерсе - объект түзүү PublisherClient, методун колдонуу менен теманын жолун белгилеңиз topic_path жана функцияны чакырыңыз publish с topic_path жана маалыматтар. Биз импорттойбуз generate_log_line биздин сценарийден stream_logs, андыктан бул файлдар бир папкада экенин текшериңиз, антпесе сиз импорттоо катасын аласыз. Андан кийин биз муну Google консолубуз аркылуу иштете алабыз:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Файл иштей баштаганда, биз төмөндөгү сүрөттө көрсөтүлгөндөй, консолго журнал маалыматтарынын чыгышын көрө алабыз. Бул скрипт биз колдонбой эле иштей берет CTRL + Cаны аягына чыгаруу үчүн.

Биз маалымат агымын иштетүү түтүгүн түзөбүз. 2-бөлүк
Сүрөт 4. Чыгуу publish_logs.py

Биздин түтүк кодун жазуу

Эми биз бардыгын даярдап койгондон кийин, кызыктуу бөлүгүн баштай алабыз - Beam жана Python аркылуу түтүктү коддоо. Beam түтүгүн түзүү үчүн, биз түтүк объектин (р) түзүшүбүз керек. Биз түтүк объектин түзгөндөн кийин, биз операторду колдонуп бир нече функцияларды биринин артынан бири колдоно алабыз pipe (|). Жалпысынан алганда, иш процесси төмөндөгү сүрөттө окшош.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Биздин кодубузда биз эки ыңгайлаштырылган функцияны түзөбүз. Функция regex_clean, бул функцияны колдонуу менен маалыматтарды сканерлейт жана PATTERNS тизмесинин негизинде тиешелүү сапты чыгарат re.search. Функция үтүр менен бөлүнгөн сапты кайтарат. Эгер сиз кадимки сөз айкашынын адиси болбосоңуз, мен муну текшерүүнү сунуштайм окуу куралы жана кодду текшерүү үчүн блокнотто машыгыңыз. Андан кийин биз ыңгайлаштырылган ParDo функциясын аныктайбыз жаруу, бул параллелдүү иштетүү үчүн Beam трансформациясынын вариациясы. Pythonдо бул өзгөчө жол менен жасалат - биз DoFn Beam классынан мураска калган класс түзүшүбүз керек. Бөлүү функциясы мурунку функциядан талданган сапты алып, биздин BigQuery таблицабыздагы тилке аттарына туура келген баскычтары менен сөздүктөрдүн тизмесин кайтарат. Бул функция жөнүндө белгилей турган бир нерсе бар: импорттоого туура келди datetime анын иштеши үчүн функциянын ичинде. Мен файлдын башында импорттоо катасын алып жаткам, бул таң калыштуу болду. Бул тизме андан кийин функцияга өткөрүлүп берилет WriteToBigQuery, бул жөн гана таблицага биздин маалыматтарды кошот. Пакеттик DataFlow жумушунун жана Streaming DataFlow жумушунун коду төмөндө келтирилген. Пакет менен агымдык коддун ортосундагы бир гана айырма - бул партияда биз CSVди окуйбуз src_pathфункцияны колдонуу ReadFromText Beam тартып.

Пакеттик DataFlow жумушу (партиялык иштетүү)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (агымдык иштетүү)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Конвейерди ишке киргизүү

Биз куурду бир нече ар кандай жолдор менен иштете алабыз. Эгерде биз кааласак, GCPге алыстан кирип жатканда аны терминалдан локалдык түрдө иштетсек болот.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Бирок, биз аны DataFlow аркылуу иштетебиз. Төмөнкү талап кылынган параметрлерди орнотуу менен төмөнкү буйрукту колдонуу менен муну жасай алабыз.

  • project — GCP долбооруңуздун идентификатору.
  • runner программаңызды талдап, түтүктү кура турган түтүк өткөргүч. Булутта иштетүү үчүн, сиз DataflowRunner көрсөтүшүңүз керек.
  • staging_location — ишти аткарып жаткан процессорлорго керектүү код пакеттерин индекстөө үчүн Cloud Dataflow булут сактагычына жол.
  • temp_location — түтүк иштеп жатканда түзүлгөн убактылуу жумуш файлдарын сактоо үчүн Cloud Dataflow булут сактагычына жол.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Бул буйрук иштеп жатканда, биз Google консолундагы DataFlow өтмөгүнө өтүп, биздин түтүктү көрө алабыз. Түтүк өткөргүчтү чыкылдатканыбызда, 4-сүрөткө окшош нерсени көрүшүбүз керек. Мүчүлүштүктөрдү оңдоо максатында, деталдуу журналдарды көрүү үчүн Logs, андан кийин Stackdriver'ге өтүү абдан пайдалуу болушу мүмкүн. Бул мага бир катар учурларда куурдагы көйгөйлөрдү чечүүгө жардам берди.

Биз маалымат агымын иштетүү түтүгүн түзөбүз. 2-бөлүк
4-сүрөт: нурлуу конвейер

BigQueryдеги дайындарыбызга кириңиз

Ошентип, биздин таблицага маалыматтар агып турган куурубуз болушу керек. Муну текшерүү үчүн биз BigQuery'ге барып, маалыматтарды карай алабыз. Төмөнкү буйрукту колдонгондон кийин, маалымат топтомунун биринчи бир нече саптарын көрүшүңүз керек. Эми бизде BigQueryде сакталган маалыматтар бар, биз андан ары талдоо жүргүзө алабыз, ошондой эле маалыматтарды кесиптештер менен бөлүшүп, бизнес суроолоруна жооп бере баштайбыз.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Биз маалымат агымын иштетүү түтүгүн түзөбүз. 2-бөлүк
5-сүрөт: BigQuery

жыйынтыктоо

Бул пост агымдык маалымат түтүгүн түзүүнүн, ошондой эле маалыматтарды жеткиликтүү кылуунун жолдорун табууга пайдалуу мисал болот деп үмүттөнөбүз. Бул форматта маалыматтарды сактоо бизге көптөгөн артыкчылыктарды берет. Эми биздин продуктуну канча адам колдонот деген сыяктуу маанилүү суроолорго жооп бере баштайбыз. Сиздин колдонуучу базаңыз убакыттын өтүшү менен өсүп жатабы? Адамдар продуктунун кайсы аспектилери менен көбүрөөк иштешет? Жана болбошу керек жерде каталар барбы? Мына ушул суроолор уюмду кызыктырат. Бул суроолорго жооптордон келип чыккан түшүнүктөрдүн негизинде биз өнүмдү жакшырта алабыз жана колдонуучулардын катышуусун арттыра алабыз.

Beam көнүгүүлөрдүн бул түрү үчүн чынында эле пайдалуу жана башка бир катар кызыктуу колдонуу учурлары да бар. Мисалы, сиз реалдуу убакыт режиминде запастык кенелердин маалыматтарын талдап, анализдин негизинде соода кылгыңыз келиши мүмкүн, балким, сизде транспорт каражаттарынан келген сенсор маалыматтары бар жана трафиктин деңгээлин эсептөөлөрдү эсептегиңиз келет. Ошондой эле, мисалы, колдонуучунун маалыматтарын чогултуучу жана аны негизги көрсөткүчтөрдү көзөмөлдөө үчүн панелдерди түзүү үчүн колдонгон оюн компаниясы болушу мүмкүн. Макул, мырзалар, бул башка посттун темасы, окуганыңыз үчүн рахмат жана толук кодду көрүүнү каалагандар үчүн төмөндө менин GitHub шилтемем бар.

https://github.com/DFoly/User_log_pipeline

Болгону ушул. Биринчи бөлүгүн оку.

Source: www.habr.com

Комментарий кошуу