Létrehozunk egy adatfolyam adatfeldolgozási folyamatot. 2. rész

Sziasztok. Megosztjuk a cikk utolsó részének fordítását, amely kifejezetten a kurzus hallgatói számára készült. Adatmérnök. Az első részt olvashatjátok itt.

Apache Beam és DataFlow valós idejű csővezetékekhez

Létrehozunk egy adatfolyam adatfeldolgozási folyamatot. 2. rész

A Google Cloud beállítása

Megjegyzés: A folyamat futtatásához és az egyéni naplóadatok közzétételéhez a Google Cloud Shell szolgáltatást használtam, mert problémáim voltak a folyamat Python 3-ban való futtatásával. A Google Cloud Shell a Python 2-t használja, amely jobban konzisztens az Apache Beammel.

A csővezeték elindításához egy kicsit bele kell ásnunk a beállításokba. Azok számára, akik korábban nem használták a GCP-t, az alábbi 6 lépést kell követniük oldal.

Ezt követően fel kell töltenünk szkriptjeinket a Google Cloud Storage szolgáltatásba, és át kell másolnunk őket a Google Cloud Shelbe. A felhőtárhelyre való feltöltés meglehetősen triviális (leírás található itt). Fájljaink másolásához megnyithatjuk a Google Cloud Shel alkalmazást az eszköztárról az alábbi 2. ábra bal oldalán található első ikonra kattintva.

Létrehozunk egy adatfolyam adatfeldolgozási folyamatot. 2. rész
Ábra 2

Az alábbiakban felsoroljuk azokat a parancsokat, amelyekre szükségünk van a fájlok másolásához és a szükséges könyvtárak telepítéséhez.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Adatbázisunk és táblázatunk elkészítése

Miután elvégeztük az összes beállítással kapcsolatos lépést, a következő dolgunk az, hogy létrehozunk egy adatkészletet és táblázatot a BigQueryben. Ennek többféle módja is van, de a legegyszerűbb a Google Cloud konzol használata az adatkészlet létrehozásával. Kövesse az alábbi lépéseket linktáblázat létrehozásához egy sémával. A mi asztalunk lesz 7 oszlop, amely megfelel az egyes felhasználói naplók összetevőinek. A kényelem kedvéért az összes oszlopot karakterláncként definiáljuk, kivéve a timelocal változót, és a korábban generált változók szerint nevezzük el őket. A táblázatunk elrendezése úgy nézzen ki, mint a 3. ábra.

Létrehozunk egy adatfolyam adatfeldolgozási folyamatot. 2. rész
3. ábra Táblázat elrendezése

Felhasználói naplóadatok közzététele

A Pub/Sub kritikus összetevője folyamatunknak, mivel lehetővé teszi több független alkalmazás egymás közötti kommunikációját. Különösen közvetítőként működik, amely lehetővé teszi számunkra, hogy üzeneteket küldjünk és fogadjunk az alkalmazások között. Az első dolog, amit tennünk kell, egy téma létrehozása. Egyszerűen lépjen a Pub/Sub elemre a konzolon, és kattintson a TÉMA LÉTREHOZÁSA lehetőségre.

Az alábbi kód meghívja a szkriptünket, hogy létrehozza a fent meghatározott naplóadatokat, majd összekapcsolja és elküldi a naplókat a Pub/Sub-nak. Az egyetlen dolog, amit tennünk kell, egy objektum létrehozása PublisherClient, adja meg a téma elérési útját a metódus segítségével topic_path és hívja meg a függvényt publish с topic_path és adatok. Felhívjuk figyelmét, hogy importálunk generate_log_line forgatókönyvünkből stream_logs, ezért ügyeljen arra, hogy ezek a fájlok ugyanabban a mappában legyenek, különben importálási hibaüzenetet kap. Ezt követően lefuttathatjuk ezt a google konzolunkon a következő használatával:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Amint a fájl lefut, láthatjuk a naplóadatok kimenetét a konzolon, ahogy az az alábbi ábrán is látható. Ez a szkript mindaddig működik, amíg nem használjuk CTRL + Cbefejezni.

Létrehozunk egy adatfolyam adatfeldolgozási folyamatot. 2. rész
4. ábra Kimenet publish_logs.py

Csővezeték kódunk írása

Most, hogy mindent előkészítettünk, elkezdhetjük a szórakoztató részt – a folyamat kódolását Beam és Python segítségével. Beam pipeline létrehozásához létre kell hoznunk egy folyamat objektumot (p). Miután létrehoztunk egy pipeline objektumot, az operátor segítségével egymás után több függvényt is alkalmazhatunk pipe (|). Általában a munkafolyamat úgy néz ki, mint az alábbi képen.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Kódunkban két egyedi függvényt fogunk létrehozni. Funkció regex_clean, amely beolvassa az adatokat, és a funkció segítségével lekéri a megfelelő sort a MINTÁK lista alapján re.search. A függvény vesszővel elválasztott karakterláncot ad vissza. Ha nem vagy reguláris kifejezések szakértője, azt javaslom, hogy nézze meg ezt oktatóanyag és gyakorolja a jegyzettömbben a kód ellenőrzését. Ezt követően definiálunk egy egyedi ParDo függvényt Hasított, amely a Beam transzformáció egy változata párhuzamos feldolgozásra. A Pythonban ez speciális módon történik - létre kell hoznunk egy osztályt, amely örökli a DoFn Beam osztályt. A Felosztás függvény átveszi az előző függvény elemzett sorát, és visszaadja a szótárak listáját a BigQuery-táblázatunk oszlopneveinek megfelelő kulcsokkal. Ezzel a funkcióval kapcsolatban érdemes megjegyezni: importálni kellett datetime egy függvényen belül, hogy működjön. A fájl elején importálási hibát kaptam, ami furcsa volt. Ezt a listát ezután átadja a függvénynek WriteToBigQuery, amely egyszerűen hozzáadja adatainkat a táblázathoz. A Batch DataFlow Job és a Streaming DataFlow Job kódja alább látható. Az egyetlen különbség a kötegelt és a streaming kód között az, hogy kötegben a CSV-t olvassuk ki src_pathfunkció használatával ReadFromText a Beamtől.

Batch DataFlow Job (kötegelt feldolgozás)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow feladat (adatfolyam feldolgozás)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

A szállítószalag indítása

A csővezetéket többféleképpen is futtathatjuk. Ha akarjuk, csak helyileg futtathatjuk egy terminálról, miközben távolról bejelentkezünk a GCP-be.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Azonban a DataFlow használatával fogjuk futtatni. Ezt az alábbi paranccsal tehetjük meg a következő szükséges paraméterek beállításával.

  • project — GCP-projektjének azonosítója.
  • runner egy folyamatfutó, amely elemzi a programját, és megszerkeszti a folyamatot. A felhőben való futtatáshoz meg kell adnia egy DataflowRunner-t.
  • staging_location — a Cloud Dataflow felhőtároló elérési útja a munkát végző processzorok által szükséges kódcsomagok indexeléséhez.
  • temp_location — a Cloud Dataflow felhőtárhely elérési útja a folyamat közben létrehozott ideiglenes munkafájlok tárolására.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Amíg ez a parancs fut, a google konzol DataFlow lapjára léphetünk, és megtekinthetjük a folyamatunkat. Amikor rákattintunk a folyamatra, valami hasonlót kell látnunk, mint a 4. ábra. Hibakeresési célokra nagyon hasznos lehet a Naplók, majd a Stackdriver megnyitása a részletes naplók megtekintéséhez. Ez számos esetben segített megoldani a csővezeték-problémákat.

Létrehozunk egy adatfolyam adatfeldolgozási folyamatot. 2. rész
4. ábra: Gerenda szállítószalag

Hozzáférhet adatainkhoz a BigQuery szolgáltatásban

Tehát már futnia kell egy folyamatnak, amelyen az adatok a táblánkba áramlanak. Ennek teszteléséhez felkereshetjük a BigQuery-t, és megnézhetjük az adatokat. Az alábbi parancs használata után látnia kell az adatkészlet első néhány sorát. Most, hogy a BigQuery-ben tárolt adatok birtokában vagyunk, további elemzéseket végezhetünk, valamint megoszthatjuk az adatokat a kollégákkal, és elkezdhetünk válaszolni az üzleti kérdésekre.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Létrehozunk egy adatfolyam adatfeldolgozási folyamatot. 2. rész
5. ábra: BigQuery

Következtetés

Reméljük, hogy ez a bejegyzés hasznos példaként szolgál a streaming adatfolyam létrehozására, valamint arra, hogy módokat találjunk az adatok hozzáférhetőbbé tételére. Az adatok tárolása ebben a formátumban számos előnnyel jár. Most elkezdhetünk választ adni olyan fontos kérdésekre, mint például, hogy hányan használják termékünket? Növekszik a felhasználói bázisa az idő múlásával? A termék mely aspektusaival lépnek kapcsolatba a legtöbbet az emberek? És vannak olyan hibák, ahol nem kellene? Ezek azok a kérdések, amelyek érdekelni fogják a szervezetet. Az ezekre a kérdésekre adott válaszokból származó meglátások alapján javíthatjuk a terméket és növelhetjük a felhasználói elkötelezettséget.

A Beam nagyon hasznos az ilyen típusú gyakorlatokhoz, és számos más érdekes felhasználási esettel is rendelkezik. Például érdemes lehet valós időben elemezni a tőzsdei árfolyamadatokat, és az elemzés alapján kereskedéseket kötni, esetleg vannak járművekből származó szenzoradatok, és forgalomszint számításokat szeretne végezni. Lehetne például egy szerencsejáték-cég is, amely felhasználói adatokat gyűjt, és ezek alapján irányítópultokat hoz létre a legfontosabb mutatók követésére. Oké, uraim, ez egy másik bejegyzés témája, köszönöm, hogy elolvastad, és aki szeretné látni a teljes kódot, az alábbiakban a GitHub-om linkje.

https://github.com/DFoly/User_log_pipeline

Ez minden. Olvassa el az első részt.

Forrás: will.com

Hozzászólás