Við búum til straumgagnavinnsluleiðslu. 2. hluti

Hæ allir. Við erum að deila þýðingunni á síðasta hluta greinarinnar, unnin sérstaklega fyrir nemendur námskeiðsins. "gagnaverkfræðingur". Þú getur lesið fyrsta hlutann hér.

Apache Beam og DataFlow fyrir rauntíma leiðslur

Við búum til straumgagnavinnsluleiðslu. 2. hluti

Setja upp Google Cloud

Athugið: Ég notaði Google Cloud Shell til að keyra leiðsluna og birta sérsniðin annálsgögn vegna þess að ég átti í vandræðum með að keyra leiðsluna í Python 3. Google Cloud Shell notar Python 2, sem er meira í samræmi við Apache Beam.

Til að hefja leiðsluna þurfum við að grafa aðeins ofan í stillingarnar. Fyrir ykkur sem ekki hafið notað GCP áður, þið þurfið að fylgja eftirfarandi 6 skrefum sem lýst er í þessu síðu.

Eftir þetta þurfum við að hlaða upp forskriftunum okkar í Google Cloud Storage og afrita þau yfir á Google Cloud Shel okkar. Upphleðsla í skýjageymslu er frekar léttvæg (lýsing er að finna hér). Til að afrita skrárnar okkar getum við opnað Google Cloud Shel frá tækjastikunni með því að smella á fyrsta táknið til vinstri á mynd 2 hér að neðan.

Við búum til straumgagnavinnsluleiðslu. 2. hluti
Mynd 2

Skipanirnar sem við þurfum til að afrita skrárnar og setja upp nauðsynleg bókasöfn eru taldar upp hér að neðan.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Að búa til gagnagrunn okkar og töflu

Þegar við höfum lokið öllum uppsetningartengdum skrefum er það næsta sem við þurfum að gera að búa til gagnapakka og töflu í BigQuery. Það eru nokkrar leiðir til að gera þetta, en einfaldast er að nota Google Cloud stjórnborðið með því að búa til gagnasafn. Þú getur fylgst með skrefunum hér að neðan tengilltil að búa til töflu með skema. Borðið okkar mun hafa 7 dálkar, sem samsvarar íhlutum hvers notendaskrár. Til hægðarauka munum við skilgreina alla dálka sem strengi, nema tímabundnu breytuna, og nefna þá í samræmi við breyturnar sem við bjuggum til áðan. Skipulag töflunnar okkar ætti að líta út eins og á mynd 3.

Við búum til straumgagnavinnsluleiðslu. 2. hluti
Mynd 3. Taflaskipulag

Birting notendaskrárgagna

Pub/Sub er mikilvægur hluti af leiðslu okkar vegna þess að það gerir mörgum sjálfstæðum forritum kleift að eiga samskipti sín á milli. Sérstaklega virkar það sem milliliður sem gerir okkur kleift að senda og taka á móti skilaboðum á milli forrita. Það fyrsta sem við þurfum að gera er að búa til umræðuefni. Farðu einfaldlega í Pub/Sub í stjórnborðinu og smelltu á CREATE TOPIC.

Kóðinn hér að neðan kallar á handritið okkar til að búa til loggögnin sem skilgreind eru hér að ofan og tengir síðan og sendir annálana til Pub/Sub. Það eina sem við þurfum að gera er að búa til hlut PublisherClient, tilgreindu slóðina að efninu með því að nota aðferðina topic_path og hringdu í fallið publish с topic_path og gögn. Athugið að við flytjum inn generate_log_line úr handritinu okkar stream_logs, svo vertu viss um að þessar skrár séu í sömu möppu, annars færðu innflutningsvillu. Við getum síðan keyrt þetta í gegnum Google stjórnborðið okkar með því að nota:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Um leið og skráin keyrir munum við geta séð úttakið af annálagögnunum á stjórnborðið, eins og sýnt er á myndinni hér að neðan. Þetta handrit mun virka svo lengi sem við notum það ekki CTRL + Cað klára það.

Við búum til straumgagnavinnsluleiðslu. 2. hluti
Mynd 4. Framleiðsla publish_logs.py

Að skrifa leiðslukóðann okkar

Nú þegar allt er undirbúið getum við byrjað á skemmtilega hlutanum - að kóða leiðsluna okkar með Beam og Python. Til að búa til Beam leiðslu, þurfum við að búa til leiðsluhlut (p). Þegar við höfum búið til leiðsluhlut, getum við beitt mörgum aðgerðum á eftir annarri með því að nota rekstraraðilann pipe (|). Almennt séð lítur verkflæðið út eins og myndin hér að neðan.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Í kóðanum okkar munum við búa til tvær sérsniðnar aðgerðir. Virka regex_clean, sem skannar gögnin og sækir samsvarandi línu byggt á PATTERNS listanum með því að nota aðgerðina re.search. Fallið skilar streng aðskilinn með kommum. Ef þú ert ekki sérfræðingur í reglubundinni tjáningu mæli ég með að kíkja á þetta kennsluefni og æfðu þig í skrifblokk til að athuga kóðann. Eftir þetta skilgreinum við sérsniðna ParDo aðgerð sem kallast Split, sem er afbrigði af Beam transform fyrir samhliða vinnslu. Í Python er þetta gert á sérstakan hátt - við verðum að búa til flokk sem erfir frá DoFn Beam bekknum. Skipta fallið tekur þáttuðu línuna frá fyrri falli og skilar lista yfir orðabækur með lyklum sem samsvara dálknöfnunum í BigQuery töflunni okkar. Það er eitthvað að athuga við þessa aðgerð: Ég þurfti að flytja inn datetime inni í aðgerð til að láta það virka. Ég var að fá innflutningsvillu í byrjun skráarinnar, sem var skrítið. Þessi listi er síðan sendur til aðgerðarinnar WriteToBigQuery, sem einfaldlega bætir gögnum okkar við töfluna. Kóðinn fyrir Batch DataFlow Job og Streaming DataFlow Job er gefinn upp hér að neðan. Eini munurinn á lotu og streymiskóða er sá að í lotu lesum við CSV frá src_pathmeð því að nota aðgerðina ReadFromText frá Beam.

Batch DataFlow Job (lotuvinnsla)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (straumvinnsla)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Ræsir færibandið

Við getum keyrt leiðsluna á nokkra mismunandi vegu. Ef við vildum gætum við bara keyrt það á staðnum frá flugstöðinni á meðan við skráðum okkur inn á GCP fjarstýrt.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Hins vegar ætlum við að keyra það með DataFlow. Við getum gert þetta með því að nota skipunina hér að neðan með því að stilla eftirfarandi nauðsynlegar breytur.

  • project — Auðkenni GCP verkefnisins þíns.
  • runner er leiðsluhlaupari sem mun greina forritið þitt og smíða leiðsluna þína. Til að keyra í skýinu verður þú að tilgreina DataflowRunner.
  • staging_location — slóðin að Cloud Dataflow skýgeymslunni fyrir flokkun kóðapakka sem örgjörvarnir sem vinna verkið þurfa.
  • temp_location — slóð að Cloud Dataflow skýgeymslunni til að geyma tímabundnar verkskrár sem eru búnar til á meðan leiðslan er í gangi.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Á meðan þessi skipun er í gangi getum við farið í DataFlow flipann í Google stjórnborðinu og skoðað leiðsluna okkar. Þegar við smellum á leiðsluna ættum við að sjá eitthvað svipað og á mynd 4. Í villuleitarskyni getur verið mjög gagnlegt að fara í Logs og síðan í Stackdriver til að skoða nákvæma logs. Þetta hefur hjálpað mér að leysa leiðsluvandamál í mörgum tilfellum.

Við búum til straumgagnavinnsluleiðslu. 2. hluti
Mynd 4: Geislafæriband

Fáðu aðgang að gögnum okkar í BigQuery

Þannig að við ættum nú þegar að vera með leiðslu í gangi með gögn sem streyma inn í borðið okkar. Til að prófa þetta getum við farið í BigQuery og skoðað gögnin. Eftir að þú hefur notað skipunina hér að neðan ættir þú að sjá fyrstu línurnar í gagnapakkanum. Nú þegar við höfum gögnin geymd í BigQuery getum við framkvæmt frekari greiningu, auk þess að deila gögnunum með samstarfsfólki og byrjað að svara viðskiptaspurningum.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Við búum til straumgagnavinnsluleiðslu. 2. hluti
Mynd 5: BigQuery

Ályktun

Við vonum að þessi færsla þjóni sem gagnlegt dæmi um að búa til streymisgagnaleiðslu, auk þess að finna leiðir til að gera gögn aðgengilegri. Að geyma gögn á þessu sniði gefur okkur marga kosti. Nú getum við byrjað að svara mikilvægum spurningum eins og hversu margir nota vöruna okkar? Er notendahópur þinn að stækka með tímanum? Hvaða þætti vörunnar hefur fólk mest samskipti við? Og eru villur þar sem þær ættu ekki að vera? Þetta eru spurningarnar sem munu vekja áhuga stofnunarinnar. Byggt á þeirri innsýn sem kemur fram í svörum við þessum spurningum getum við bætt vöruna og aukið þátttöku notenda.

Beam er mjög gagnlegt fyrir þessa tegund af æfingum og hefur einnig fjölda annarra áhugaverðra notkunartilvika. Til dæmis gætirðu viljað greina hlutabréfamerkisgögn í rauntíma og gera viðskipti byggð á greiningunni, kannski ertu með skynjaragögn sem koma frá ökutækjum og vilt reikna út útreikninga á umferðarstigi. Þú gætir líka, til dæmis, verið leikjafyrirtæki sem safnar notendagögnum og notar þau til að búa til mælaborð til að fylgjast með lykilmælingum. Allt í lagi, herrar mínir, þetta er efni í aðra færslu, takk fyrir að lesa, og fyrir þá sem vilja sjá allan kóðann, hér að neðan er hlekkurinn á GitHub minn.

https://github.com/DFoly/User_log_pipeline

Það er allt og sumt. Lestu fyrsta hluta.

Heimild: www.habr.com

Bæta við athugasemd