Cruthaímid píblíne próiseála sonraí srutha. Cuid 2

Dia duit gach duine. Tá aistriúchán na coda deiridh den alt á roinnt againn, a ullmhaíodh go sonrach do mhic léinn an chúrsa. Innealtóir Sonraí. Is féidir leat an chéad chuid a léamh anseo.

Bhíoma Apache agus DataFlow le haghaidh Píblínte Fíor-Ama

Cruthaímid píblíne próiseála sonraí srutha. Cuid 2

Google Cloud a shocrú

Nóta: D'úsáid mé Google Cloud Shell chun an píblíne a reáchtáil agus sonraí logála saincheaptha a fhoilsiú toisc go raibh deacracht agam an píblíne a reáchtáil i Python 3. Úsáideann Google Cloud Shell Python 2, atá níos comhsheasmhaí le Apache Beam.

Chun an píblíne a thosú, ní mór dúinn beagán a thochailt isteach sna socruithe. Dóibh siúd agaibh nár bhain úsáid as GCP roimhe seo, beidh ort na 6 chéim seo a leanas atá leagtha amach anseo a leanúint leathanach.

Tar éis seo, beidh orainn ár scripteanna a uaslódáil chuig Google Cloud Storage agus iad a chóipeáil chuig ár Google Cloud Shel. Is fánach go leor é a uaslódáil chuig stóráil néil (is féidir cur síos a fháil anseo). Chun ár gcomhaid a chóipeáil, is féidir linn Google Cloud Shel a oscailt ón mbarra uirlisí trí chliceáil ar an gcéad deilbhín ar chlé i bhFíor 2 thíos.

Cruthaímid píblíne próiseála sonraí srutha. Cuid 2
Figiúr 2

Tá na horduithe a theastaíonn uainn chun na comhaid a chóipeáil agus na leabharlanna riachtanacha a shuiteáil liostaithe thíos.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Ár mbunachar sonraí agus tábla a chruthú

Nuair a bheidh na céimeanna go léir a bhaineann le socrú críochnaithe againn, is é an chéad rud eile is gá dúinn a dhéanamh ná tacar sonraí agus tábla a chruthú in BigQuery. Tá roinnt bealaí ann chun é seo a dhéanamh, ach is é an ceann is simplí consól Google Cloud a úsáid trí thacair sonraí a chruthú ar dtús. Is féidir leat na céimeanna thíos a leanúint nascchun tábla a chruthú le scéimre. Beidh ar ár mbord 7 gcolún, a fhreagraíonn do chomhpháirteanna gach logáil úsáideora. Ar mhaithe le caoithiúlacht, déanfaimid gach colún a shainiú mar teaghráin, ach amháin i gcás an athróg timelocal, agus ainmnigh iad de réir na n-athróg a ghin muid níos luaithe. Ba cheart go mbeadh an chuma ar leagan amach ár dtábla i bhFíor 3.

Cruthaímid píblíne próiseála sonraí srutha. Cuid 2
Fíor 3. Leagan amach an tábla

Sonraí logála úsáideoirí a fhoilsiú

Is comhpháirt ríthábhachtach dár bpíblíne é teach tábhairne/fo-theach mar go ligeann sé d’fheidhmchláir neamhspleácha iolracha cumarsáid a dhéanamh lena chéile. Go háirithe, oibríonn sé mar idirghabhálaí a ligeann dúinn teachtaireachtaí a sheoladh agus a fháil idir iarratais. Is é an chéad rud a chaithfimid a dhéanamh ná topaic a chruthú. Níl le déanamh ach dul go dtí teach tábhairne/fo sa chonsól agus cliceáil CRUTHAIGH ÁBHAR.

Glaonn an cód thíos ar ár script chun na sonraí logála atá sainmhínithe thuas a ghiniúint agus ansin nascann sé agus seolann sé na logaí chuig Pub/Fo. Is é an t-aon rud is gá dúinn a dhéanamh ná réad a chruthú FoilsitheoirClient, sonraigh an cosán go dtí an topaic ag baint úsáide as an modh topic_path agus glaoch ar an fheidhm publish с topic_path agus sonraí. Tabhair faoi deara go ndéanaimid iompórtáil generate_log_line ó'n script stream_logs, mar sin déan cinnte go bhfuil na comhaid seo san fhillteán céanna, ar shlí eile gheobhaidh tú earráid allmhairithe. Is féidir linn é seo a rith tríd ár gconsól google ag baint úsáide as:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Chomh luath agus a ritheann an comhad, beidh muid in ann aschur na sonraí logála chuig an consól a fheiceáil, mar a thaispeántar san fhigiúr thíos. Oibreoidh an script seo chomh fada agus nach n-úsáidfimid CTRL + Cchun é a chríochnú.

Cruthaímid píblíne próiseála sonraí srutha. Cuid 2
Fíor 4. Aschur publish_logs.py

Scríobh ár gcód píblíne

Anois go bhfuil gach rud ullmhaithe againn, is féidir linn an chuid spraoi a thosú - ár bpíblíne a chódú ag baint úsáide as Beam agus Python. Chun píblíne Bhíoma a chruthú, ní mór dúinn réad píblíne a chruthú (p). Nuair a bheidh réad píblíne cruthaithe againn, is féidir linn feidhmeanna iolracha a chur i bhfeidhm ceann i ndiaidh a chéile ag baint úsáide as an oibreoir pipe (|). Go ginearálta, is cosúil leis an sreabhadh oibre an íomhá thíos.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

In ár gcód, cruthóimid dhá fheidhm saincheaptha. Feidhm regex_clean, a dhéanann scanadh ar na sonraí agus a aisghabhann an tsraith chomhfhreagrach bunaithe ar an liosta PATRÚIN ag baint úsáide as an bhfeidhm re.search. Filleann an fheidhm teaghrán scartha le camóg. Mura saineolaí cainte rialta tú, molaim é seo a sheiceáil teagaisc agus cleachtadh i leabhar nótaí chun an cód a sheiceáil. Tar éis seo sainímid feidhm ParDo saincheaptha ar a dtugtar Scoilt, atá ina athrú ar an Bhíoma athrú le haghaidh próiseála comhthreomhar. I Python, déantar é seo ar bhealach speisialta - ní mór dúinn rang a chruthú a fhaigheann oidhreacht ó rang DoFn Beam. Tógann an fheidhm Scoilte an ró pharsáilte ón bhfeidhm roimhe seo agus seolann sí ar ais liosta d’fhoclóirí le heochracha a chomhfhreagraíonn d’ainmneacha na gcolún inár dtábla BigQuery. Tá rud éigin le tabhairt faoi deara faoin bhfeidhm seo: bhí orm iompórtáil datetime taobh istigh d'fheidhm chun go n-oibreoidh sé. Bhí earráid iompórtála á fháil agam ag tús an chomhaid, rud a bhí aisteach. Cuirtear an liosta seo ar aghaidh chuig an bhfeidhm ansin WriteToBigQuery, a chuireann go simplí ár sonraí leis an tábla. Tá an cód le haghaidh Baisc DataFlow Job agus Streaming DataFlow Job tugtha thíos. Is é an t-aon difríocht idir cód baisce agus cód sruthaithe ná gur léigh muid an CSV i mbaisc src_pathag baint úsáide as an fheidhm ReadFromText ó Bhíoma.

Jab Baisc DataFlow (baiscphróiseáil)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Sruthú DataFlow Job (próiseáil srutha)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Ag tosú an iompair

Is féidir linn an phíblíne a reáchtáil ar bhealaí éagsúla. Dá mba mhian linn, d’fhéadfaimis é a rith go háitiúil ó chríochfort agus logáil isteach go cianda ar GCP.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Mar sin féin, táimid chun é a rith ag baint úsáide as DataFlow. Is féidir linn é seo a dhéanamh ag baint úsáide as an ordú thíos trí na paraiméadair riachtanacha seo a leanas a leagan síos.

  • project — Aitheantas do thionscadal GCP.
  • runner is rádala píblíne é a dhéanfaidh anailís ar do chlár agus a thógfaidh do phíblíne. Chun rith sa scamall, ní mór duit DataflowRunner a shonrú.
  • staging_location — an cosán chuig stóráil scamall Cloud Dataflow chun pacáistí cód a innéacsú a theastaíonn ó na próiseálaithe atá i mbun na hoibre.
  • temp_location — cosán chuig stóras scamall Cloud Dataflow chun comhaid poist shealadacha a stóráil a cruthaíodh agus an phíblíne ar siúl.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Cé go bhfuil an t-ordú seo ar siúl, is féidir linn dul go dtí an cluaisín DataFlow sa chonsól google agus féachaint ar ár bpíblíne. Nuair a chliceálann muid ar an bpíblíne, ba cheart dúinn rud éigin cosúil le Fíor 4 a fheiceáil. Chun críocha dífhabhtaithe, is féidir go mbeadh sé an-chabhrach dul chuig Logs agus ansin chuig Stackdriver chun logaí mionsonraithe a fheiceáil. Chuidigh sé seo liom saincheisteanna píblíne a réiteach i roinnt cásanna.

Cruthaímid píblíne próiseála sonraí srutha. Cuid 2
Fíor 4: Iompar bhíoma

Faigh rochtain ar ár sonraí in BigQuery

Mar sin, ba cheart go mbeadh píblíne againn cheana féin agus sonraí ag sileadh isteach inár tábla. Chun é seo a thástáil, is féidir linn dul go dtí BigQuery agus breathnú ar na sonraí. Tar éis duit an t-ordú thíos a úsáid ba cheart duit na chéad chúpla sraitheanna den tacar sonraí a fheiceáil. Anois go bhfuil na sonraí stóráilte againn in BigQuery, is féidir linn tuilleadh anailíse a dhéanamh, chomh maith leis na sonraí a roinnt le comhghleacaithe agus tosú ag freagairt ceisteanna gnó.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Cruthaímid píblíne próiseála sonraí srutha. Cuid 2
Fíor 5: BigQuery

Conclúid

Tá súil againn go mbeidh an post seo ina shampla úsáideach de phíblíne sonraí sruthaithe a chruthú, chomh maith le bealaí a aimsiú chun sonraí a dhéanamh níos inrochtana. Tugann stóráil sonraí san fhormáid seo go leor buntáistí dúinn. Anois is féidir linn tosú ag freagairt ceisteanna tábhachtacha cosúil le cé mhéad duine a úsáideann ár dtáirge? An bhfuil do bhonn úsáideora ag fás le himeacht ama? Cad iad na gnéithe den táirge a mbíonn an caidreamh is mó ag daoine leo? Agus an bhfuil earráidí ann nár cheart a bheith ann? Seo iad na ceisteanna a bheidh ina ábhar spéise don eagraíocht. Bunaithe ar na léargais a eascraíonn as na freagraí ar na ceisteanna seo, is féidir linn an táirge a fheabhsú agus rannpháirtíocht úsáideoirí a mhéadú.

Tá Beam an-úsáideach don chineál seo aclaíochta agus tá roinnt cásanna úsáide suimiúla eile ann freisin. Mar shampla, b'fhéidir gur mhaith leat anailís a dhéanamh ar shonraí ticbhosca stoic i bhfíor-am agus ceirdeanna a dhéanamh bunaithe ar an anailís, b'fhéidir go bhfuil sonraí braiteora agat ag teacht ó fheithiclí agus gur mhaith leat ríomhaireachtaí leibhéal tráchta a ríomh. D'fhéadfá freisin, mar shampla, a bheith i do chuideachta cearrbhachais a bhailíonn sonraí úsáideoirí agus a úsáideann é chun deais a chruthú chun príomhmhéadracht a rianú. Maith go leor, a dhaoine uaisle, is ábhar é seo do phost eile, go raibh maith agat as léamh, agus dóibh siúd ar mian leo an cód iomlán a fheiceáil, thíos tá an nasc chuig mo GitHub.

https://github.com/DFoly/User_log_pipeline

Sin go léir. Léigh cuid a haon.

Foinse: will.com

Add a comment