Tunaunda bomba la usindikaji wa data ya mkondo. Sehemu ya 2

Salaam wote. Tunashiriki tafsiri ya sehemu ya mwisho ya makala, iliyotayarishwa mahsusi kwa wanafunzi wa kozi hiyo. Mhandisi wa Data. Unaweza kusoma sehemu ya kwanza hapa.

Apache Beam na DataFlow kwa Mabomba ya Wakati Halisi

Tunaunda bomba la usindikaji wa data ya mkondo. Sehemu ya 2

Inasanidi Wingu la Google

Kumbuka: Nilitumia Google Cloud Shell kuendesha bomba na kuchapisha data ya kumbukumbu maalum kwa sababu nilikuwa na matatizo ya kuendesha bomba katika Python 3. Google Cloud Shell hutumia Python 2, ambayo inalingana zaidi na Apache Beam.

Ili kuanza bomba, tunahitaji kuchimba kidogo kwenye mipangilio. Kwa wale ambao hamjatumia GCP hapo awali, utahitaji kufuata hatua 6 zifuatazo zilizoainishwa katika hili ukurasa.

Baada ya haya, tutahitaji kupakia hati zetu kwenye Hifadhi ya Wingu la Google na kuzinakili kwenye Shel yetu ya Wingu la Google. Kupakia kwenye hifadhi ya wingu ni jambo dogo sana (maelezo yanaweza kupatikana hapa) Ili kunakili faili zetu, tunaweza kufungua Google Cloud Shel kutoka kwa upau wa vidhibiti kwa kubofya ikoni ya kwanza iliyo upande wa kushoto katika Mchoro 2 hapa chini.

Tunaunda bomba la usindikaji wa data ya mkondo. Sehemu ya 2
Kielelezo 2

Amri tunazohitaji kunakili faili na kusakinisha maktaba zinazohitajika zimeorodheshwa hapa chini.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Kuunda hifadhidata yetu na jedwali

Mara tu tunapokamilisha hatua zote zinazohusiana na usanidi, jambo linalofuata tunalohitaji kufanya ni kuunda mkusanyiko wa data na jedwali katika BigQuery. Kuna njia kadhaa za kufanya hivyo, lakini rahisi zaidi ni kutumia kiweko cha Wingu la Google kwa kuunda seti ya data kwanza. Unaweza kufuata hatua zilizo hapa chini kiungokuunda meza na schema. Meza yetu itakuwa na 7 safu, sambamba na vipengele vya kila logi ya mtumiaji. Kwa urahisi, tutafafanua nguzo zote kama kamba, isipokuwa kwa kutofautiana kwa wakati, na kuzitaja kulingana na vigezo tulivyozalisha mapema. Mpangilio wa jedwali letu unapaswa kuonekana kama kwenye Kielelezo 3.

Tunaunda bomba la usindikaji wa data ya mkondo. Sehemu ya 2
Kielelezo 3. Mpangilio wa jedwali

Kuchapisha data ya kumbukumbu ya mtumiaji

Pub/Sub ni sehemu muhimu ya bomba letu kwa sababu inaruhusu programu nyingi zinazojitegemea kuwasiliana. Hasa, inafanya kazi kama mpatanishi ambayo inaruhusu sisi kutuma na kupokea ujumbe kati ya programu. Jambo la kwanza tunalohitaji kufanya ni kuunda mada. Nenda tu kwa Pub/Sub kwenye kiweko na ubofye CREATE TOPIC.

Msimbo ulio hapa chini huita hati yetu kutoa data ya kumbukumbu iliyofafanuliwa hapo juu na kisha kuunganisha na kutuma kumbukumbu hizo kwa Pub/Sub. Kitu pekee tunachohitaji kufanya ni kuunda kitu MchapishajiMteja, taja njia ya mada kwa kutumia mbinu topic_path na piga kazi publish с topic_path na data. Tafadhali kumbuka kuwa tunaagiza generate_log_line kutoka kwa hati yetu stream_logs, kwa hivyo hakikisha kuwa faili hizi ziko kwenye folda moja, vinginevyo utapata hitilafu ya kuingiza. Kisha tunaweza kuendesha hii kupitia google console yetu kwa kutumia:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Mara tu faili inapoendesha, tutaweza kuona matokeo ya data ya kumbukumbu kwenye koni, kama inavyoonyeshwa kwenye takwimu hapa chini. Hati hii itafanya kazi mradi hatutumii CTRL + Ckuikamilisha.

Tunaunda bomba la usindikaji wa data ya mkondo. Sehemu ya 2
Kielelezo 4. Pato publish_logs.py

Kuandika msimbo wetu wa bomba

Sasa kwa kuwa tumetayarisha kila kitu, tunaweza kuanza sehemu ya kufurahisha - kusimba bomba letu kwa kutumia Beam na Python. Ili kuunda bomba la Boriti, tunahitaji kuunda kitu cha bomba (p). Mara tu tunapounda kipengee cha bomba, tunaweza kutumia kazi nyingi moja baada ya nyingine kwa kutumia opereta pipe (|). Kwa ujumla, mtiririko wa kazi unaonekana kama picha hapa chini.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Katika kanuni yetu, tutaunda kazi mbili za desturi. Kazi regex_clean, ambayo huchanganua data na kupata safu mlalo inayolingana kulingana na orodha ya PATTERNS kwa kutumia chaguo la kukokotoa re.search. Chaguo za kukokotoa hurejesha mfuatano uliotenganishwa kwa koma. Ikiwa wewe si mtaalamu wa kujieleza mara kwa mara, napendekeza uangalie hili mafunzo na ufanye mazoezi kwenye daftari ili kuangalia nambari. Baada ya haya tunafafanua kitendakazi maalum cha ParDo kinachoitwa Kupasuliwa, ambayo ni tofauti ya ubadilishaji wa Boriti kwa usindikaji sambamba. Katika Python, hii inafanywa kwa njia maalum - lazima tuunda darasa ambalo linarithi kutoka kwa darasa la DoFn Beam. Chaguo la kukokotoa la Mgawanyiko huchukua safu mlalo iliyochanganuliwa kutoka kwa chaguo la kukokotoa lililotangulia na kurudisha orodha ya kamusi iliyo na vitufe vinavyolingana na majina ya safu wima katika jedwali letu la BigQuery. Kuna kitu cha kuzingatia kuhusu kazi hii: Ilinibidi kuagiza datetime ndani ya kitendaji ili kuifanya ifanye kazi. Nilikuwa nikipata hitilafu ya uingizaji mwanzoni mwa faili, ambayo ilikuwa ya kushangaza. Orodha hii kisha hupitishwa kwa chaguo za kukokotoa AndikaToBigQuery, ambayo huongeza tu data yetu kwenye jedwali. Msimbo wa Kazi ya Kundi la DataFlow na Kazi ya Utiririshaji wa Data umepewa hapa chini. Tofauti pekee kati ya kundi na msimbo wa utiririshaji ni kwamba katika kundi tunasoma CSV kutoka src_pathkutumia kipengele ReadFromText kutoka kwa Beam.

Kazi ya Batch DataFlow (uchakataji wa bechi)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Kazi ya Utiririshaji wa Data (uchakataji wa mtiririko)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Kuanzisha conveyor

Tunaweza kuendesha bomba kwa njia kadhaa tofauti. Ikiwa tungetaka, tunaweza kuiendesha ndani kutoka kwa terminal huku tukiingia kwenye GCP kwa mbali.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Walakini, tutaiendesha kwa kutumia DataFlow. Tunaweza kufanya hivyo kwa kutumia amri ifuatayo kwa kuweka vigezo vifuatavyo vinavyohitajika.

  • project - Kitambulisho cha mradi wako wa GCP.
  • runner ni mkimbiaji bomba ambaye atachambua programu yako na kuunda bomba lako. Ili kuendesha katika wingu, lazima ubainishe DataflowRunner.
  • staging_location β€” njia ya hifadhi ya wingu ya Cloud Dataflow kwa kuorodhesha vifurushi vya msimbo vinavyohitajika na wasindikaji wanaofanya kazi.
  • temp_location β€” njia ya hifadhi ya wingu ya Cloud Dataflow kwa kuhifadhi faili za kazi za muda zilizoundwa wakati bomba linaendelea.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Wakati amri hii inaendeshwa, tunaweza kwenda kwenye kichupo cha DataFlow kwenye kiweko cha google na kutazama bomba letu. Tunapobofya kwenye bomba, tunapaswa kuona kitu sawa na Mchoro 4. Kwa madhumuni ya utatuzi, inaweza kusaidia sana kwenda kwenye Kumbukumbu na kisha kwa Stackdriver ili kuona kumbukumbu za kina. Hii imenisaidia kutatua masuala ya bomba katika visa kadhaa.

Tunaunda bomba la usindikaji wa data ya mkondo. Sehemu ya 2
Kielelezo cha 4: Kisafirishaji cha boriti

Fikia data yetu katika BigQuery

Kwa hivyo, tunapaswa kuwa tayari na bomba linaloendesha na data inapita kwenye meza yetu. Ili kujaribu hili, tunaweza kwenda kwa BigQuery na kuangalia data. Baada ya kutumia amri hapa chini unapaswa kuona safu chache za kwanza za seti ya data. Kwa kuwa sasa tuna data iliyohifadhiwa katika BigQuery, tunaweza kufanya uchambuzi zaidi, na pia kushiriki data na wenzetu na kuanza kujibu maswali ya biashara.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Tunaunda bomba la usindikaji wa data ya mkondo. Sehemu ya 2
Kielelezo cha 5: BigQuery

Hitimisho

Tunatumahi kuwa chapisho hili litakuwa mfano muhimu wa kuunda bomba la data ya utiririshaji, na pia kutafuta njia za kufanya data kufikiwa zaidi. Kuhifadhi data katika umbizo hili hutupatia faida nyingi. Sasa tunaweza kuanza kujibu maswali muhimu kama vile ni watu wangapi wanaotumia bidhaa zetu? Je, watumiaji wako wanaongezeka kwa wakati? Ni vipengele vipi vya bidhaa ambavyo watu huingiliana navyo zaidi? Na kuna makosa ambapo haipaswi kuwa? Haya ni maswali ambayo yatakuwa ya manufaa kwa shirika. Kulingana na maarifa yanayotokana na majibu ya maswali haya, tunaweza kuboresha bidhaa na kuongeza ushiriki wa watumiaji.

Beam ni muhimu sana kwa aina hii ya mazoezi na ina idadi ya matukio mengine ya kuvutia ya utumiaji pia. Kwa mfano, unaweza kutaka kuchanganua data ya tiki ya hisa kwa wakati halisi na kufanya biashara kulingana na uchanganuzi, labda una data ya vitambuzi kutoka kwa magari na ungependa kukokotoa hesabu za kiwango cha trafiki. Unaweza pia, kwa mfano, kuwa kampuni ya michezo ya kubahatisha inayokusanya data ya mtumiaji na kuitumia kuunda dashibodi ili kufuatilia vipimo muhimu. Sawa, waungwana, hii ni mada ya chapisho lingine, asante kwa kusoma, na kwa wale wanaotaka kuona nambari kamili, hapa chini ni kiunga cha GitHub yangu.

https://github.com/DFoly/User_log_pipeline

Hiyo ndiyo yote. Soma sehemu ya kwanza.

Chanzo: mapenzi.com

Kuongeza maoni