Luomme stream-tiedonkäsittelyputken. Osa 2

Hei kaikki. Jaamme artikkelin viimeisen osan käännöksen, joka on valmistettu erityisesti kurssin opiskelijoille. Tietojen insinööri. Voit lukea ensimmäisen osan täällä.

Apache Beam ja DataFlow reaaliaikaisia ​​putkia varten

Luomme stream-tiedonkäsittelyputken. Osa 2

Google Cloudin määrittäminen

Huomautus: Käytin Google Cloud Shelliä liukuhihnan suorittamiseen ja mukautettujen lokitietojen julkaisemiseen, koska minulla oli ongelmia liukuhihnan suorittamisessa Python 3:ssa. Google Cloud Shell käyttää Python 2:ta, joka on yhdenmukaisempi Apache Beamin kanssa.

Liukulinjan aloittamiseksi meidän on kaivettava hieman asetuksia. Niiden teistä, jotka eivät ole aiemmin käyttäneet GCP:tä, sinun on noudatettava tässä kuvattuja 6 vaihetta sivu.

Tämän jälkeen meidän on ladattava skriptimme Google Cloud Storageen ja kopioitava ne Google Cloud Sheliin. Lataaminen pilvitallennustilaan on melko triviaalia (kuvaus löytyy täällä). Kopioidaksemme tiedostomme voimme avata Google Cloud Shelin työkalupalkista napsauttamalla ensimmäistä kuvaketta vasemmalla alla olevassa kuvassa 2.

Luomme stream-tiedonkäsittelyputken. Osa 2
Kuvio 2

Alla on lueteltu komennot, joita tarvitsemme tiedostojen kopioimiseen ja tarvittavien kirjastojen asentamiseen.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Tietokannan ja taulukon luominen

Kun olemme suorittaneet kaikki määritykseen liittyvät vaiheet, meidän on seuraavaksi luotava tietojoukko ja taulukko BigQueryssa. On olemassa useita tapoja tehdä tämä, mutta yksinkertaisin on käyttää Google Cloud -konsolia luomalla ensin tietojoukko. Voit seurata alla olevia ohjeita linkkiluodaksesi kaavion sisältävän taulukon. Pöydässämme on 7 saraketta, joka vastaa kunkin käyttäjälokin osia. Mukavuuden vuoksi määritämme kaikki sarakkeet merkkijonoiksi paitsi aikapaikalliseksi muuttujaksi ja nimeämme ne aiemmin luomiemme muuttujien mukaan. Pöydän asettelun tulisi näyttää kuvassa 3.

Luomme stream-tiedonkäsittelyputken. Osa 2
Kuva 3. Taulukon asettelu

Käyttäjälokitietojen julkaiseminen

Pub/Sub on putkistomme kriittinen osa, koska sen avulla useat itsenäiset sovellukset voivat kommunikoida keskenään. Se toimii erityisesti välittäjänä, jonka avulla voimme lähettää ja vastaanottaa viestejä sovellusten välillä. Ensimmäinen asia, joka meidän on tehtävä, on luoda aihe. Mene vain konsolin Pub/Sub-kohtaan ja napsauta LUO AIHE.

Alla oleva koodi kutsuu komentosarjaamme luomaan yllä määritellyt lokitiedot ja muodostaa sitten yhteyden ja lähettää lokit Pub/Subille. Ainoa asia, joka meidän on tehtävä, on luoda esine PublisherClient, määritä polku aiheeseen menetelmällä topic_path ja kutsu funktio publish с topic_path ja dataa. Huomaa, että tuomme generate_log_line käsikirjoituksestamme stream_logs, joten varmista, että nämä tiedostot ovat samassa kansiossa, muuten saat tuontivirheen. Voimme sitten suorittaa tämän Google-konsolimme kautta käyttämällä:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Heti kun tiedosto suoritetaan, voimme nähdä lokitietojen tulosteen konsoliin alla olevan kuvan mukaisesti. Tämä skripti toimii niin kauan kuin emme käytä CTRL + Ctäydentämään sitä.

Luomme stream-tiedonkäsittelyputken. Osa 2
Kuva 4. Lähtö publish_logs.py

Kirjoitetaan putkistokoodiamme

Nyt kun meillä on kaikki valmiina, voimme aloittaa hauskan osan - putkilinjamme koodaamisen Beamilla ja Pythonilla. Beam-liukuhihnan luomiseksi meidän on luotava liukuhihnaobjekti (p). Kun olemme luoneet liukuhihnaobjektin, voimme käyttää useita toimintoja peräkkäin käyttämällä operaattoria pipe (|). Yleisesti ottaen työnkulku näyttää alla olevan kuvan kalta.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Luomme koodissamme kaksi mukautettua funktiota. Toiminto regex_clean, joka skannaa tiedot ja hakee vastaavan rivin PATTERNS-luettelon perusteella funktion avulla re.search. Funktio palauttaa pilkuilla erotetun merkkijonon. Jos et ole säännöllisten lausekkeiden asiantuntija, suosittelen tutustumaan tähän opetusohjelma ja harjoittele koodin tarkistamista muistilehtiössä. Tämän jälkeen määritämme mukautetun ParDo-funktion nimeltä Jakaa, joka on muunnelma Beam-muunnoksesta rinnakkaiskäsittelyä varten. Pythonissa tämä tehdään erityisellä tavalla - meidän on luotava luokka, joka perii DoFn Beam -luokasta. Split-funktio ottaa jäsennetyn rivin edellisestä funktiosta ja palauttaa luettelon sanakirjoista, joiden avaimet vastaavat BigQuery-taulukon sarakkeiden nimiä. Tässä toiminnossa on jotain huomioitavaa: minun piti tuoda datetime toiminnon sisällä saadaksesi sen toimimaan. Sain tuontivirheen tiedoston alussa, mikä oli outoa. Tämä luettelo välitetään sitten funktiolle WriteToBigQuery, joka yksinkertaisesti lisää tietomme taulukkoon. Batch DataFlow Jobin ja Streaming DataFlow Jobin koodi on annettu alla. Ainoa ero erä- ja suoratoistokoodin välillä on se, että erässä luemme CSV:n src_pathfunktiota käyttämällä ReadFromText Beamista.

Erä DataFlow Job (eräkäsittely)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

DataFlow-työn suoratoisto (streamin käsittely)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Kuljettimen käynnistäminen

Voimme ajaa putkia useilla eri tavoilla. Jos halusimme, voisimme vain ajaa sen paikallisesti päätteestä samalla kun kirjaudumme GCP:hen etänä.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Aiomme kuitenkin suorittaa sen käyttämällä DataFlow'ta. Voimme tehdä tämän käyttämällä alla olevaa komentoa asettamalla seuraavat pakolliset parametrit.

  • project — GCP-projektisi tunnus.
  • runner on putkisto, joka analysoi ohjelmasi ja rakentaa putkilinjasi. Jotta voit suorittaa pilvessä, sinun on määritettävä DataflowRunner.
  • staging_location — polku Cloud Dataflow -pilvitallennustilaan työn suorittavien prosessorien tarvitsemien koodipakettien indeksointia varten.
  • temp_location — polku Cloud Dataflow -pilvitallennustilaan liukuhihnan ollessa käynnissä luotujen väliaikaisten työtiedostojen tallentamista varten.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Kun tämä komento on käynnissä, voimme siirtyä Google-konsolin DataFlow-välilehteen ja tarkastella putkistoamme. Kun napsautamme liukuhihnaa, meidän pitäisi nähdä jotain kuvan 4 kaltaista. Viankorjaustarkoituksiin voi olla erittäin hyödyllistä siirtyä lokeihin ja sitten Stackdriveriin tarkastellaksesi yksityiskohtaisia ​​lokeja. Tämä on auttanut minua ratkaisemaan putkien ongelmia useissa tapauksissa.

Luomme stream-tiedonkäsittelyputken. Osa 2
Kuva 4: Palkkikuljetin

Pääsy tietoihimme BigQueryssa

Meillä pitäisi siis jo olla käynnissä putki, jonka tiedot virtaavat taulukkoomme. Tämän testaamiseksi voimme siirtyä BigQueryyn ja tarkastella tietoja. Alla olevan komennon käytön jälkeen sinun pitäisi nähdä tietojoukon ensimmäiset rivit. Nyt kun data on tallennettu BigQueryyn, voimme tehdä lisäanalyysejä, jakaa tiedot työtovereiden kanssa ja alkaa vastata liiketoimintakysymyksiin.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Luomme stream-tiedonkäsittelyputken. Osa 2
Kuva 5: BigQuery

Johtopäätös

Toivomme, että tämä viesti toimii hyödyllisenä esimerkkinä suoratoistodatan luomisesta sekä tapojen löytämisestä datan helpottamiseksi. Tietojen tallentaminen tässä muodossa antaa meille monia etuja. Nyt voimme alkaa vastata tärkeisiin kysymyksiin, kuten kuinka moni käyttää tuotettamme? Kasvaako käyttäjäkuntasi ajan myötä? Mihin tuotteen osa-alueisiin ihmiset ovat eniten vuorovaikutuksessa? Ja onko virheitä siellä, missä niitä ei pitäisi olla? Nämä ovat kysymyksiä, jotka kiinnostavat organisaatiota. Näiden kysymysten vastauksista saamien oivallusten perusteella voimme parantaa tuotetta ja lisätä käyttäjien sitoutumista.

Beam on todella hyödyllinen tämäntyyppisessä harjoituksessa ja sillä on myös monia muita mielenkiintoisia käyttötapauksia. Voit esimerkiksi analysoida osakekurssitietoja reaaliajassa ja tehdä kauppoja analyysin perusteella, kenties sinulla on ajoneuvoista tulevaa anturidataa ja haluat laskea liikennetason laskelmia. Voit myös olla esimerkiksi peliyritys, joka kerää käyttäjätietoja ja luo sen avulla kojetauluja tärkeimpien mittareiden seuraamiseksi. Okei, herrat, tämä on toisen postauksen aihe, kiitos lukemisesta, ja niille, jotka haluavat nähdä koko koodin, alla on linkki GitHubiini.

https://github.com/DFoly/User_log_pipeline

Se on kaikki. Lue osa yksi.

Lähde: will.com

Lisää kommentti