Bidh sinn a’ cruthachadh loidhne-phìoban giollachd dàta sruthan. Pàirt 2

Hi uile. Tha sinn a’ roinn eadar-theangachadh a’ phàirt mu dheireadh den artaigil, a chaidh ullachadh gu sònraichte airson oileanaich a’ chùrsa. Einnseanair dàta. Faodaidh tu a’ chiad phàirt a leughadh an seo.

Apache Beam agus DataFlow airson Pìoban Fìor-ùine

Bidh sinn a’ cruthachadh loidhne-phìoban giollachd dàta sruthan. Pàirt 2

A’ stèidheachadh Google Cloud

Nota: Chleachd mi Google Cloud Shell gus an loidhne-phìoban a ruith agus dàta log àbhaisteach fhoillseachadh oir bha duilgheadas agam a bhith a’ ruith na loidhne-phìoban ann am Python 3. Bidh Google Cloud Shell a’ cleachdadh Python 2, a tha nas cunbhalaiche ri Apache Beam.

Gus an loidhne-phìoban a thòiseachadh, feumaidh sinn beagan a chladhach a-steach do na roghainnean. Dhaibhsan agaibhse nach do chleachd GCP roimhe seo, feumaidh tu na 6 ceumannan a leanas a tha air am mìneachadh ann an seo a leantainn duilleag.

Às deidh seo, feumaidh sinn na sgriobtaichean againn a luchdachadh suas gu Google Cloud Storage agus an lethbhreac gu ar Google Cloud Shel. Tha luchdachadh suas gu stòradh neòil gu math beag (gheibhear tuairisgeul an seo). Gus na faidhlichean againn a chopaigeadh, is urrainn dhuinn Google Cloud Shel fhosgladh bhon bhàr-inneal le bhith a’ briogadh air a’ chiad ìomhaigh air an taobh chlì ann am Figear 2 gu h-ìosal.

Bidh sinn a’ cruthachadh loidhne-phìoban giollachd dàta sruthan. Pàirt 2
Figear 2

Tha na h-òrdughan a dh’ fheumas sinn airson na faidhlichean a chopaigeadh agus na leabharlannan riatanach a stàladh air an liostadh gu h-ìosal.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Cruthachadh ar stòr-dàta agus clàr

Aon uair ‘s gu bheil sinn air na ceumannan co-cheangailte ri stèidheachadh a chrìochnachadh, is e an ath rud a dh’ fheumas sinn a dhèanamh a bhith a’ cruthachadh stòr-dàta agus clàr ann am BigQuery. Tha grunn dhòighean ann seo a dhèanamh, ach is e an rud as sìmplidh consol Google Cloud a chleachdadh le bhith a’ cruthachadh stòr-dàta an-toiseach. Faodaidh tu na ceumannan gu h-ìosal a leantainn cheangalgus clàr a chruthachadh le sgeama. Bidh am bòrd againn 7 colbhan, a rèir co-phàirtean gach log cleachdaiche. Airson goireasachd, mìnichidh sinn a h-uile colbh mar shreathan, ach a-mhàin an caochladair timelocal, agus ainmich sinn iad a rèir nan caochladairean a chruthaich sinn na bu thràithe. Bu chòir gum biodh cruth a’ bhùird againn coltach ri Figear 3.

Bidh sinn a’ cruthachadh loidhne-phìoban giollachd dàta sruthan. Pàirt 2
Рисунок 3. Схема таблицы

Foillsich dàta log luchd-cleachdaidh

Tha Pub/Sub na phàirt riatanach den loidhne-phìoban againn oir leigidh e le grunn thagraidhean neo-eisimeileach conaltradh le chèile. Gu sònraichte, bidh e ag obair mar eadar-mheadhanair a leigeas leinn teachdaireachdan a chuir agus fhaighinn eadar tagraidhean. Is e a’ chiad rud a dh’fheumas sinn a dhèanamh cuspair a chruthachadh. Dìreach rachaibh gu Pub/Sub sa chonsail agus cliog air CREATE TOPIC.

Bidh an còd gu h-ìosal a’ gairm ar sgriobt gus an dàta log a tha air a mhìneachadh gu h-àrd a ghineadh agus an uairsin a’ ceangal agus a ’cur na logaichean gu Pub / Sub. Is e an aon rud a dh'fheumas sinn a dhèanamh rud a chruthachadh FoillsichearClient, sònraich an t-slighe chun chuspair a’ cleachdadh an dòigh topic_path agus gairm an gnìomh publish с topic_path agus dàta. Thoir an aire gu bheil sinn a’ toirt a-steach generate_log_line o'n sgriobtuir stream_logs, mar sin dèan cinnteach gu bheil na faidhlichean sin san aon phasgan, air neo gheibh thu mearachd in-mhalairt. Faodaidh sinn an uairsin seo a ruith tron ​​chonsal google againn a’ cleachdadh:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Cho luath ‘s a bhios am faidhle a’ ruith, bidh e comasach dhuinn toradh an dàta log fhaicinn chun consol, mar a chithear san fhigear gu h-ìosal. Obraichidh an sgriobt seo cho fad 's nach cleachd sinn CTRL + Cgus a chrìochnachadh.

Bidh sinn a’ cruthachadh loidhne-phìoban giollachd dàta sruthan. Pàirt 2
Figear 4. Toradh publish_logs.py

Sgrìobhadh ar còd loidhne-phìoban

A-nis gu bheil a h-uile càil air ullachadh, is urrainn dhuinn am pàirt spòrsail a thòiseachadh - a’ còdadh ar loidhne-phìoban a’ cleachdadh Beam agus Python. Gus loidhne-phìoban Beam a chruthachadh, feumaidh sinn rud loidhne-phìoban a chruthachadh (p). Aon uair ‘s gu bheil sinn air rud loidhne-phìoban a chruthachadh, is urrainn dhuinn grunn ghnìomhan a chuir an sàs aon às deidh a chèile a’ cleachdadh a ’ghnìomhaiche pipe (|). San fharsaingeachd, tha an sruth-obrach coltach ris an ìomhaigh gu h-ìosal.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Anns a’ chòd againn, cruthaichidh sinn dà ghnìomh àbhaisteach. Gnìomh regex_clean, a bhios a’ sganadh an dàta agus a’ faighinn air ais an t-sreath fhreagarrach stèidhichte air liosta PATTERNS a’ cleachdadh a’ ghnìomh re.search. Bidh an gnìomh a’ tilleadh sreang sgaraichte le cromag. Mura h-eil thu nad eòlaiche faireachdainn cunbhalach, tha mi a’ moladh sùil a thoirt air seo teagaisg agus cleachd ann an notepad gus an còd a sgrùdadh. Às deidh seo bidh sinn a’ mìneachadh gnìomh ParDo àbhaisteach ris an canar Split, a tha na atharrachadh air cruth-atharrachadh Beam airson giollachd co-shìnte. Ann am Python, tha seo air a dhèanamh ann an dòigh shònraichte - feumaidh sinn clas a chruthachadh a gheibh seilbh bhon chlas DoFn Beam. Gabhaidh an gnìomh Split an loidhne parsed bhon ghnìomh roimhe agus tillidh e liosta fhaclairean le iuchraichean a fhreagras air ainmean nan colbhan sa chlàr BigQuery againn. Tha rudeigin ri thoirt fa-near mun ghnìomh seo: bha agam ri toirt a-steach datetime taobh a-staigh gnìomh gus toirt air obrachadh. Bha mi a’ faighinn mearachd in-mhalairt aig toiseach an fhaidhle, rud a bha neònach. Thèid an liosta seo an uairsin a thoirt don ghnìomh WriteToBigQuery, a tha dìreach a’ cur ar dàta ris a’ chlàr. Tha an còd airson Batch DataFlow Job agus Streaming DataFlow Job air a thoirt seachad gu h-ìosal. Is e an aon eadar-dhealachadh eadar baidse agus còd sruthadh gun leugh sinn an CSV ann am baidse src_patha’ cleachdadh a’ ghnìomh ReadFromText bho Beam.

Iob Batch DataFlow (giollachd baidse)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Sruth DataFlow Job (giollachd sruthan)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

A 'tòiseachadh air a' ghiùlan

Faodaidh sinn an loidhne-phìoban a ruith ann an grunn dhòighean eadar-dhealaichte. Nam biodh sinn ag iarraidh, b’ urrainn dhuinn a ruith gu h-ionadail bho cheann-uidhe fhad ‘s a bha sinn a’ logadh a-steach gu GCP air astar.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Ach, tha sinn gu bhith ga ruith a’ cleachdadh DataFlow. Is urrainn dhuinn seo a dhèanamh leis an àithne gu h-ìosal le bhith a’ suidheachadh nam paramadairean riatanach a leanas.

  • project - ID den phròiseact GCP agad.
  • runner na ruitheadair loidhne-phìoban a nì sgrùdadh air a’ phrògram agad agus a thogas do loidhne-phìoban. Gus ruith san sgòth, feumaidh tu DataflowRunner a shònrachadh.
  • staging_location - an t-slighe gu stòradh sgòthan Cloud Dataflow airson pasganan còd clàr-amais a dh ’fheumas na pròiseasairean a tha a’ coileanadh na h-obrach.
  • temp_location - slighe gu stòradh neòil Cloud Dataflow airson faidhlichean obrach sealach a stòradh fhad ‘s a tha an loidhne-phìoban a’ ruith.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Fhad ‘s a tha an àithne seo a’ ruith, is urrainn dhuinn a dhol chun taba DataFlow ann an consol google agus coimhead air an loidhne-phìoban againn. Nuair a phutas sinn air an loidhne-phìoban, bu chòir dhuinn rudeigin coltach ri Figear 4 fhaicinn. Airson adhbharan debugging, faodaidh e a bhith gu math cuideachail a dhol gu Logaichean agus an uairsin gu Stackdriver gus na logaichean mionaideach fhaicinn. Tha seo air mo chuideachadh le bhith a’ fuasgladh chùisean loidhne-phìoban ann an grunn chùisean.

Bidh sinn a’ cruthachadh loidhne-phìoban giollachd dàta sruthan. Pàirt 2
Figear 4: Beam ghiùladair

Faigh cothrom air an dàta againn ann am BigQuery

Mar sin, bu chòir loidhne-phìoban a bhith againn mu thràth le dàta a’ sruthadh a-steach don bhòrd againn. Gus seo a dhearbhadh, is urrainn dhuinn a dhol gu BigQuery agus coimhead air an dàta. Às deidh dhut an àithne gu h-ìosal a chleachdadh bu chòir dhut a’ chiad beagan shreathan den stòr-dàta fhaicinn. A-nis gu bheil an dàta againn air a stòradh ann am BigQuery, is urrainn dhuinn tuilleadh sgrùdaidh a dhèanamh, a bharrachd air an dàta a cho-roinn le co-obraichean agus tòiseachadh air ceistean gnìomhachais a fhreagairt.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Bidh sinn a’ cruthachadh loidhne-phìoban giollachd dàta sruthan. Pàirt 2
Figear 5: BigQuery

co-dhùnadh

Tha sinn an dòchas gum bi an dreuchd seo na eisimpleir feumail de bhith a’ cruthachadh loidhne-phìoban dàta sruthadh, a bharrachd air dòighean a lorg gus dàta a dhèanamh nas ruigsinneach. Bheir stòradh dàta sa chruth seo mòran bhuannachdan dhuinn. A-nis is urrainn dhuinn tòiseachadh air ceistean cudromach a fhreagairt mar cia mheud duine a bhios a’ cleachdadh ar toradh? A bheil do bhunait luchd-cleachdaidh a’ fàs thar ùine? Dè na taobhan den toradh a bhios daoine ag eadar-obrachadh leis as motha? Agus a bheil mearachdan ann far nach bu chòir a bhith ann? Is iad seo na ceistean a bhios inntinneach don bhuidheann. Stèidhichte air na beachdan a tha a’ nochdadh bho fhreagairtean nan ceistean sin, is urrainn dhuinn an toradh adhartachadh agus conaltradh luchd-cleachdaidh a mheudachadh.

Tha beam air leth feumail airson an seòrsa eacarsaich seo agus tha grunn chùisean cleachdaidh inntinneach eile ann cuideachd. Mar eisimpleir, is dòcha gum bi thu airson mion-sgrùdadh a dhèanamh air dàta strìochag stoc ann an àm fìor agus ciùird a dhèanamh stèidhichte air an anailis, is dòcha gu bheil dàta mothachaidh agad a’ tighinn bho charbadan agus gu bheil thu airson àireamhachadh ìre trafaic obrachadh a-mach. Dh’ fhaodadh tu cuideachd, mar eisimpleir, a bhith nad chompanaidh gèam a bhios a’ cruinneachadh dàta luchd-cleachdaidh agus ga chleachdadh gus deas-bhòrdan a chruthachadh gus sùil a chumail air prìomh mheatrics. Ceart gu leòr, a dhaoine uaisle, is e cuspair a tha seo airson post eile, taing airson leughadh, agus dhaibhsan a tha airson an còd slàn fhaicinn, gu h-ìosal tha an ceangal gu mo GitHub.

https://github.com/DFoly/User_log_pipeline

Tha sin uile. Leugh pàirt a h-aon.

Source: www.habr.com

Cuir beachd ann