Nous créons un pipeline de traitement de données de flux. Partie 2

Salut tout le monde. Nous partageons la traduction de la dernière partie de l'article, préparée spécifiquement pour les étudiants du cours. Ingénieur de données. Vous pouvez lire la première partie ici.

Apache Beam et DataFlow pour les pipelines en temps réel

Nous créons un pipeline de traitement de données de flux. Partie 2

Configuration de Google Cloud

Remarque : J'ai utilisé Google Cloud Shell pour exécuter le pipeline et publier des données de journal personnalisées, car je rencontrais des difficultés pour exécuter le pipeline dans Python 3. Google Cloud Shell utilise Python 2, qui est plus cohérent avec Apache Beam.

Pour démarrer le pipeline, nous devons creuser un peu dans les paramètres. Pour ceux d'entre vous qui n'ont jamais utilisé GCP auparavant, vous devrez suivre les 6 étapes suivantes décrites dans ce document. page.

Après cela, nous devrons télécharger nos scripts sur Google Cloud Storage et les copier sur notre Google Cloud Shel. Le téléchargement vers le stockage cloud est assez simple (une description peut être trouvée ici). Pour copier nos fichiers, nous pouvons ouvrir Google Cloud Shel depuis la barre d'outils en cliquant sur la première icône à gauche dans la figure 2 ci-dessous.

Nous créons un pipeline de traitement de données de flux. Partie 2
Figure 2

Les commandes dont nous avons besoin pour copier les fichiers et installer les bibliothèques requises sont répertoriées ci-dessous.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Création de notre base de données et de notre table

Une fois que nous avons terminé toutes les étapes liées à la configuration, la prochaine chose que nous devons faire est de créer un ensemble de données et une table dans BigQuery. Il existe plusieurs façons de procéder, mais la plus simple consiste à utiliser la console Google Cloud en créant d'abord un ensemble de données. Vous pouvez suivre les étapes ci-dessous lienpour créer une table avec un schéma. Notre table aura 7 colonnes, correspondant aux composants de chaque journal utilisateur. Pour plus de commodité, nous définirons toutes les colonnes sous forme de chaînes, à l'exception de la variable timelocal, et les nommerons en fonction des variables que nous avons générées précédemment. La disposition de notre tableau devrait ressembler à celle de la figure 3.

Nous créons un pipeline de traitement de données de flux. Partie 2
Figure 3. Disposition du tableau

Publication des données du journal utilisateur

Pub/Sub est un composant essentiel de notre pipeline car il permet à plusieurs applications indépendantes de communiquer entre elles. Il fonctionne notamment comme un intermédiaire qui nous permet d'envoyer et de recevoir des messages entre applications. La première chose que nous devons faire est de créer un sujet. Accédez simplement à Pub/Sub dans la console et cliquez sur CRÉER UN SUJET.

Le code ci-dessous appelle notre script pour générer les données de journal définies ci-dessus, puis se connecte et envoie les journaux à Pub/Sub. La seule chose que nous devons faire est de créer un objet ClientÉditeur, précisez le chemin d'accès au sujet à l'aide de la méthode topic_path et appelle la fonction publish с topic_path et des données. Veuillez noter que nous importons generate_log_line de notre script stream_logs, assurez-vous donc que ces fichiers se trouvent dans le même dossier, sinon vous obtiendrez une erreur d'importation. Nous pouvons ensuite l'exécuter via notre console Google en utilisant :

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Dès que le fichier sera exécuté, nous pourrons voir la sortie des données du journal sur la console, comme le montre la figure ci-dessous. Ce script fonctionnera tant que nous n'utilisons pas CTRL + Cpour le compléter.

Nous créons un pipeline de traitement de données de flux. Partie 2
Figure 4. Sortie publish_logs.py

Écrire notre code de pipeline

Maintenant que tout est préparé, nous pouvons commencer la partie la plus amusante : coder notre pipeline à l'aide de Beam et Python. Pour créer un pipeline Beam, nous devons créer un objet pipeline (p). Une fois que nous avons créé un objet pipeline, nous pouvons appliquer plusieurs fonctions les unes après les autres à l'aide de l'opérateur pipe (|). En général, le flux de travail ressemble à l'image ci-dessous.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Dans notre code, nous allons créer deux fonctions personnalisées. Fonction regex_clean, qui analyse les données et récupère la ligne correspondante en fonction de la liste PATTERNS à l'aide de la fonction re.search. La fonction renvoie une chaîne séparée par des virgules. Si vous n'êtes pas un expert en expressions régulières, je vous recommande de consulter ceci Didacticiel et entraînez-vous dans un bloc-notes pour vérifier le code. Après cela, nous définissons une fonction ParDo personnalisée appelée Diviser, qui est une variante de la transformée Beam pour le traitement parallèle. En Python, cela se fait d'une manière spéciale : nous devons créer une classe qui hérite de la classe DoFn Beam. La fonction Split prend la ligne analysée de la fonction précédente et renvoie une liste de dictionnaires avec des clés correspondant aux noms de colonnes de notre table BigQuery. Il y a quelque chose à noter à propos de cette fonction : j'ai dû importer datetime à l'intérieur d'une fonction pour la faire fonctionner. J'obtenais une erreur d'importation au début du fichier, ce qui était bizarre. Cette liste est ensuite transmise à la fonction Écrire vers BigQuery, qui ajoute simplement nos données au tableau. Le code du travail Batch DataFlow et du travail Streaming DataFlow est indiqué ci-dessous. La seule différence entre le code batch et le code streaming est que par lots, nous lisons le CSV à partir de src_pathen utilisant la fonction ReadFromText de Beam.

Travail DataFlow par lots (traitement par lots)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Tâche DataFlow en streaming (traitement de flux)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Démarrage du convoyeur

Nous pouvons gérer le pipeline de plusieurs manières différentes. Si nous le voulions, nous pourrions simplement l'exécuter localement à partir d'un terminal tout en nous connectant à GCP à distance.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Cependant, nous allons l'exécuter en utilisant DataFlow. Nous pouvons le faire en utilisant la commande ci-dessous en définissant les paramètres requis suivants.

  • project — ID de votre projet GCP.
  • runner est un exécuteur de pipeline qui analysera votre programme et construira votre pipeline. Pour exécuter dans le cloud, vous devez spécifier un DataflowRunner.
  • staging_location - le chemin d'accès au stockage cloud Cloud Dataflow pour l'indexation des packages de code nécessaires aux processeurs effectuant le travail.
  • temp_location - chemin d'accès au stockage cloud Cloud Dataflow pour stocker les fichiers de tâches temporaires créés pendant l'exécution du pipeline.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Pendant que cette commande est en cours d'exécution, nous pouvons accéder à l'onglet DataFlow dans la console Google et afficher notre pipeline. Lorsque nous cliquons sur le pipeline, nous devrions voir quelque chose de similaire à la figure 4. À des fins de débogage, il peut être très utile d'accéder aux journaux, puis à Stackdriver pour afficher les journaux détaillés. Cela m'a aidé à résoudre des problèmes de pipeline dans un certain nombre de cas.

Nous créons un pipeline de traitement de données de flux. Partie 2
Figure 4 : Convoyeur à poutre

Accédez à nos données dans BigQuery

Nous devrions donc déjà avoir un pipeline en cours d’exécution avec des données circulant dans notre table. Pour tester cela, nous pouvons accéder à BigQuery et examiner les données. Après avoir utilisé la commande ci-dessous, vous devriez voir les premières lignes de l'ensemble de données. Maintenant que nous disposons des données stockées dans BigQuery, nous pouvons effectuer une analyse plus approfondie, partager les données avec des collègues et commencer à répondre aux questions commerciales.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Nous créons un pipeline de traitement de données de flux. Partie 2
Figure 5 : BigQuery

Conclusion

Nous espérons que cet article servira d'exemple utile pour créer un pipeline de données en streaming, ainsi que pour trouver des moyens de rendre les données plus accessibles. Stocker les données dans ce format nous offre de nombreux avantages. Nous pouvons maintenant commencer à répondre à des questions importantes telles que combien de personnes utilisent notre produit ? Votre base d’utilisateurs augmente-t-elle avec le temps ? Avec quels aspects du produit les gens interagissent-ils le plus ? Et y a-t-il des erreurs là où il ne devrait pas y en avoir ? Telles sont les questions qui intéresseront l’organisation. Sur la base des informations qui émergent des réponses à ces questions, nous pouvons améliorer le produit et accroître l'engagement des utilisateurs.

Beam est vraiment utile pour ce type d’exercice et propose également un certain nombre d’autres cas d’utilisation intéressants. Par exemple, vous souhaiterez peut-être analyser les données de cotation boursière en temps réel et effectuer des transactions basées sur l'analyse, peut-être avez-vous des données de capteurs provenant de véhicules et souhaitez-vous calculer des calculs de niveau de trafic. Vous pourriez également, par exemple, être une société de jeux qui collecte les données des utilisateurs et les utilise pour créer des tableaux de bord permettant de suivre les indicateurs clés. D'accord, messieurs, c'est un sujet pour un autre article, merci d'avoir lu, et pour ceux qui veulent voir le code complet, vous trouverez ci-dessous le lien vers mon GitHub.

https://github.com/DFoly/User_log_pipeline

C'est tout. Lire la première partie.

Source: habr.com

Ajouter un commentaire