Timapanga payipi yosinthira deta. Gawo 2

Moni nonse. Tikugawana kumasulira kwa gawo lomaliza la nkhaniyi, lokonzedwa makamaka kwa ophunzira a maphunzirowo. Data Engineer. Mukhoza kuwerenga gawo loyamba apa.

Apache Beam ndi DataFlow for Real-Time Pipelines

Timapanga payipi yosinthira deta. Gawo 2

Kukhazikitsa Google Cloud

Zindikirani: Ndinagwiritsa ntchito Google Cloud Shell poyendetsa mapaipi ndikufalitsa deta ya zolemba zanu chifukwa ndinali ndi vuto loyendetsa payipi mu Python 3. Google Cloud Shell imagwiritsa ntchito Python 2, yomwe imagwirizana kwambiri ndi Apache Beam.

Kuti tiyambe payipi, tifunika kukumba pang'ono pazokonda. Kwa inu omwe simunagwiritsepo ntchito GCP m'mbuyomu, muyenera kutsatira njira 6 zotsatirazi zomwe zafotokozedwa mu izi tsamba.

Pambuyo pake, tidzafunika kukweza zolemba zathu ku Google Cloud Storage ndikuzikopera ku Google Cloud Shel. Kukwezera kumalo osungira mitambo ndikosavuta (kufotokozera kungapezeke apa). Kuti tikopere mafayilo athu, titha kutsegula Google Cloud Shel kuchokera pazida podina chizindikiro choyamba chakumanzere pa chithunzi 2 pansipa.

Timapanga payipi yosinthira deta. Gawo 2
Chithunzi 2

Malamulo omwe timafunikira kukopera mafayilo ndikuyika malaibulale ofunikira alembedwa pansipa.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Kupanga database yathu ndi tebulo

Tikamaliza masitepe onse okhudzana ndi kukhazikitsidwa, chinthu chotsatira chomwe tikuyenera kuchita ndikupanga dawunilodi ndi tebulo mu BigQuery. Pali njira zingapo zochitira izi, koma chophweka ndikugwiritsa ntchito Google Cloud console popanga kaye deta. Mukhoza kutsatira ndondomeko pansipa kugwirizanakupanga tebulo ndi schema. Gome lathu lidzakhala 7 ndime, mogwirizana ndi zigawo za chipika aliyense wosuta. Kuti zitheke, tidzatanthauzira zipilala zonse ngati zingwe, kupatula zosintha zanthawi yake, ndikuzitchula molingana ndi zomwe tidapanga kale. Maonekedwe a tebulo lathu ayenera kuwoneka ngati chithunzi 3.

Timapanga payipi yosinthira deta. Gawo 2
Chithunzi 3. Mapangidwe a tebulo

Kusindikiza deta ya zolemba

Pub/Sub ndi gawo lofunikira kwambiri pamapaipi athu chifukwa imalola mapulogalamu angapo odziyimira pawokha kuti azilumikizana. Makamaka, imagwira ntchito ngati mkhalapakati yomwe imatilola kutumiza ndi kulandira mauthenga pakati pa mapulogalamu. Chinthu choyamba chimene tiyenera kuchita ndi kupanga mutu. Ingopitani ku Pub/Sub mu kontena ndikudina CREATE TOPIC.

Khodi yomwe ili pansipa imayitanitsa script yathu kuti ipange zolemba zomwe zafotokozedwa pamwambapa ndikulumikiza ndikutumiza zipikazo ku Pub/Sub. Chinthu chokha chimene tiyenera kuchita ndi kupanga chinthu PublisherClient, tchulani njira yopita kumutu pogwiritsa ntchito njira topic_path ndi kuitana ntchito publish с topic_path ndi data. Chonde dziwani kuti timatumiza kunja generate_log_line kuchokera pa script yathu stream_logs, kotero onetsetsani kuti mafayilowa ali mufoda yomweyi, apo ayi mupeza cholakwika cholowetsa. Titha kuyendetsa izi kudzera mu google console yathu pogwiritsa ntchito:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Fayiloyo ikangothamanga, tidzatha kuona zotsatira za deta ya log ku console, monga momwe tawonetsera pa chithunzi pansipa. Script iyi idzagwira ntchito bola ngati sitigwiritsa ntchito CTRL + Ckuti amalize.

Timapanga payipi yosinthira deta. Gawo 2
Chithunzi 4. Zotsatira publish_logs.py

Kulemba khodi yathu yamapaipi

Tsopano popeza takonzekera zonse, titha kuyambitsa gawo losangalatsa - kukopera mapaipi athu pogwiritsa ntchito Beam ndi Python. Kuti tipange payipi ya Beam, tifunika kupanga chinthu chapaipi (p). Tikapanga chinthu cha mapaipi, titha kugwiritsa ntchito ntchito zingapo imodzi ndi ina pogwiritsa ntchito woyendetsa pipe (|). Kawirikawiri, kayendetsedwe ka ntchito kakuwoneka ngati chithunzi pansipa.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Mu code yathu, tidzapanga ntchito ziwiri zachizolowezi. Ntchito regex_clean, yomwe imayang'ana deta ndikupeza mzere wogwirizana ndi mndandanda wa PATTERNS pogwiritsa ntchito ntchitoyi re.search. Ntchitoyi imabweretsanso chingwe cholekanitsidwa ndi koma. Ngati simuli katswiri wamawu wokhazikika, ndikupangira kuti muwone izi phunziro ndikuyeseza mu notepad kuti muwone ma code. Zitatha izi timatanthauzira ntchito ya ParDo yomwe imatchedwa Gawa, komwe ndi kusinthika kwa kusintha kwa Beam kuti igwirizane. Ku Python, izi zimachitika mwapadera - tiyenera kupanga kalasi yomwe imatenga cholowa cha DoFn Beam kalasi. Ntchito ya Split imatenga mzere wodulitsidwa kuchokera pa ntchito yapitayi ndikubwezeretsanso mndandanda wa madikishonale okhala ndi makiyi olingana ndi mayina azagawo mu tebulo lathu la BigQuery. Pali china chake chokhudza ntchitoyi: Ndinayenera kuitanitsa kunja datetime mkati mwa ntchito kuti igwire ntchito. Ndinali kupeza cholakwika cholowetsa kumayambiriro kwa fayilo, zomwe zinali zodabwitsa. Mndandandawu umaperekedwa ku ntchitoyo LembaniToBigQuery, zomwe zimangowonjezera deta yathu patebulo. Khodi ya Batch DataFlow Job ndi Streaming DataFlow Job yaperekedwa pansipa. Kusiyana kokha pakati pa batch ndi code yotsatsira ndikuti mu batch timawerenga CSV kuchokera src_pathpogwiritsa ntchito ReadFromText kuchokera ku Beam.

Ntchito ya Batch DataFlow (Batch processing)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Kusakatula kwa DataFlow Job (kukonza mtsinje)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Kuyamba kwa conveyor

Titha kuyendetsa mapaipi m'njira zingapo zosiyanasiyana. Ngati tikanafuna, titha kungoyendetsa kwanuko kuchokera ku terminal ndikulowa mu GCP patali.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Komabe, tidzayendetsa pogwiritsa ntchito DataFlow. Titha kuchita izi pogwiritsa ntchito lamulo ili pansipa pokhazikitsa magawo ofunikira awa.

  • project - ID ya polojekiti yanu ya GCP.
  • runner ndi woyendetsa mapaipi omwe amasanthula pulogalamu yanu ndikupanga mapaipi anu. Kuti muyendetse mumtambo, muyenera kufotokozera DataflowRunner.
  • staging_location - njira yopita ku Cloud Dataflow kusungirako mtambo kwa indexing code packages zofunika ndi mapurosesa omwe akugwira ntchitoyi.
  • temp_location - njira yopita ku Cloud Dataflow mtambo yosungirako mafayilo akanthawi omwe amapangidwa pomwe payipi ikugwira ntchito.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Pomwe lamuloli likuyenda, titha kupita ku tabu ya DataFlow mu google console ndikuwona mapaipi athu. Tikadina papaipi, tiyenera kuwona chofanana ndi Chithunzi 4. Pofuna kuthetsa zolakwika, zingakhale zothandiza kwambiri kupita ku Logs ndiyeno ku Stackdriver kuti muwone zolemba zambiri. Izi zandithandiza kuthetsa nkhani za mapaipi muzochitika zingapo.

Timapanga payipi yosinthira deta. Gawo 2
Chithunzi 4: Woyendetsa mtengo

Pezani zambiri zathu mu BigQuery

Chifukwa chake, tiyenera kukhala ndi mapaipi omwe akuyenda ndi data yomwe ikuyenda patebulo lathu. Kuti tiyese izi, titha kupita ku BigQuery ndikuyang'ana deta. Mukamaliza kugwiritsa ntchito lamulo ili pansipa muyenera kuwona mizere ingapo yoyamba ya dataset. Tsopano popeza tili ndi data yomwe yasungidwa mu BigQuery, titha kusanthulanso kwina, komanso kugawana zambiri ndi anzathu ndikuyamba kuyankha mafunso abizinesi.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Timapanga payipi yosinthira deta. Gawo 2
Chithunzi 5: BigQuery

Pomaliza

Tikukhulupirira kuti positiyi ndi chitsanzo chothandiza popanga payipi ya data yotsatsira, komanso kupeza njira zopangira kuti deta ipezeke. Kusunga deta mu mtundu uwu kumatipatsa ubwino wambiri. Tsopano titha kuyamba kuyankha mafunso ofunika monga kuti ndi anthu angati omwe amagwiritsa ntchito mankhwala athu? Kodi ogwiritsa ntchito anu akukula pakapita nthawi? Ndi zinthu ziti zomwe anthu amakumana nazo kwambiri? Ndipo pali zolakwika pomwe siziyenera kukhala? Awa ndi mafunso omwe angakhale osangalatsa ku bungwe. Kutengera zidziwitso zomwe zimachokera ku mayankho a mafunsowa, titha kukonza malonda ndikuwonjezera chidwi cha ogwiritsa ntchito.

Beam ndiyothandiza kwambiri pazolimbitsa thupi zamtunduwu ndipo ilinso ndi zochitika zina zingapo zosangalatsa. Mwachitsanzo, mungafune kusanthula deta ya tick tick mu nthawi yeniyeni ndikupanga malonda kutengera kusanthula, mwina muli ndi chidziwitso cha sensor chomwe chimachokera kumagalimoto ndipo mukufuna kuwerengera kuchuluka kwa magalimoto. Mukhozanso, mwachitsanzo, kukhala kampani yamasewera yomwe imasonkhanitsa deta ya ogwiritsa ntchito ndikuigwiritsa ntchito kupanga ma dashboards kuti azitsatira ma metrics ofunika. Chabwino, abambo, uwu ndi mutu wa positi ina, zikomo powerenga, komanso kwa iwo omwe akufuna kuwona code yonse, pansipa pali ulalo wa GitHub wanga.

https://github.com/DFoly/User_log_pipeline

Ndizo zonse. Werengani gawo loyamba.

Source: www.habr.com

Kuwonjezera ndemanga