Salut tout le monde. Nous partageons la traduction de la dernière partie de l'article, préparée spécifiquement pour les étudiants du cours.
Apache Beam et DataFlow pour les pipelines en temps réel
Configuration de Google Cloud
Remarque : J'ai utilisé Google Cloud Shell pour exécuter le pipeline et publier des données de journal personnalisées, car je rencontrais des difficultés pour exécuter le pipeline dans Python 3. Google Cloud Shell utilise Python 2, qui est plus cohérent avec Apache Beam.
Pour démarrer le pipeline, nous devons creuser un peu dans les paramètres. Pour ceux d'entre vous qui n'ont jamais utilisé GCP auparavant, vous devrez suivre les 6 étapes suivantes décrites dans ce document.
Après cela, nous devrons télécharger nos scripts sur Google Cloud Storage et les copier sur notre Google Cloud Shel. Le téléchargement vers le stockage cloud est assez simple (une description peut être trouvée
Figure 2
Les commandes dont nous avons besoin pour copier les fichiers et installer les bibliothèques requises sont répertoriées ci-dessous.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Création de notre base de données et de notre table
Une fois que nous avons terminé toutes les étapes liées à la configuration, la prochaine chose que nous devons faire est de créer un ensemble de données et une table dans BigQuery. Il existe plusieurs façons de procéder, mais la plus simple consiste à utiliser la console Google Cloud en créant d'abord un ensemble de données. Vous pouvez suivre les étapes ci-dessous
Figure 3. Disposition du tableau
Publication des données du journal utilisateur
Pub/Sub est un composant essentiel de notre pipeline car il permet à plusieurs applications indépendantes de communiquer entre elles. Il fonctionne notamment comme un intermédiaire qui nous permet d'envoyer et de recevoir des messages entre applications. La première chose que nous devons faire est de créer un sujet. Accédez simplement à Pub/Sub dans la console et cliquez sur CRÉER UN SUJET.
Le code ci-dessous appelle notre script pour générer les données de journal définies ci-dessus, puis se connecte et envoie les journaux à Pub/Sub. La seule chose que nous devons faire est de créer un objet ClientÉditeur, précisez le chemin d'accès au sujet à l'aide de la méthode topic_path
et appelle la fonction publish
с topic_path
et des données. Veuillez noter que nous importons generate_log_line
de notre script stream_logs
, assurez-vous donc que ces fichiers se trouvent dans le même dossier, sinon vous obtiendrez une erreur d'importation. Nous pouvons ensuite l'exécuter via notre console Google en utilisant :
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Dès que le fichier sera exécuté, nous pourrons voir la sortie des données du journal sur la console, comme le montre la figure ci-dessous. Ce script fonctionnera tant que nous n'utilisons pas CTRL + Cpour le compléter.
Figure 4. Sortie publish_logs.py
Écrire notre code de pipeline
Maintenant que tout est préparé, nous pouvons commencer la partie la plus amusante : coder notre pipeline à l'aide de Beam et Python. Pour créer un pipeline Beam, nous devons créer un objet pipeline (p). Une fois que nous avons créé un objet pipeline, nous pouvons appliquer plusieurs fonctions les unes après les autres à l'aide de l'opérateur pipe (|)
. En général, le flux de travail ressemble à l'image ci-dessous.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Dans notre code, nous allons créer deux fonctions personnalisées. Fonction regex_clean
, qui analyse les données et récupère la ligne correspondante en fonction de la liste PATTERNS à l'aide de la fonction re.search
. La fonction renvoie une chaîne séparée par des virgules. Si vous n'êtes pas un expert en expressions régulières, je vous recommande de consulter ceci datetime
à l'intérieur d'une fonction pour la faire fonctionner. J'obtenais une erreur d'importation au début du fichier, ce qui était bizarre. Cette liste est ensuite transmise à la fonction Écrire vers BigQuery, qui ajoute simplement nos données au tableau. Le code du travail Batch DataFlow et du travail Streaming DataFlow est indiqué ci-dessous. La seule différence entre le code batch et le code streaming est que par lots, nous lisons le CSV à partir de src_path
en utilisant la fonction ReadFromText
de Beam.
Travail DataFlow par lots (traitement par lots)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Tâche DataFlow en streaming (traitement de flux)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Démarrage du convoyeur
Nous pouvons gérer le pipeline de plusieurs manières différentes. Si nous le voulions, nous pourrions simplement l'exécuter localement à partir d'un terminal tout en nous connectant à GCP à distance.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Cependant, nous allons l'exécuter en utilisant DataFlow. Nous pouvons le faire en utilisant la commande ci-dessous en définissant les paramètres requis suivants.
project
— ID de votre projet GCP.runner
est un exécuteur de pipeline qui analysera votre programme et construira votre pipeline. Pour exécuter dans le cloud, vous devez spécifier un DataflowRunner.staging_location
- le chemin d'accès au stockage cloud Cloud Dataflow pour l'indexation des packages de code nécessaires aux processeurs effectuant le travail.temp_location
- chemin d'accès au stockage cloud Cloud Dataflow pour stocker les fichiers de tâches temporaires créés pendant l'exécution du pipeline.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Pendant que cette commande est en cours d'exécution, nous pouvons accéder à l'onglet DataFlow dans la console Google et afficher notre pipeline. Lorsque nous cliquons sur le pipeline, nous devrions voir quelque chose de similaire à la figure 4. À des fins de débogage, il peut être très utile d'accéder aux journaux, puis à Stackdriver pour afficher les journaux détaillés. Cela m'a aidé à résoudre des problèmes de pipeline dans un certain nombre de cas.
Figure 4 : Convoyeur à poutre
Accédez à nos données dans BigQuery
Nous devrions donc déjà avoir un pipeline en cours d’exécution avec des données circulant dans notre table. Pour tester cela, nous pouvons accéder à BigQuery et examiner les données. Après avoir utilisé la commande ci-dessous, vous devriez voir les premières lignes de l'ensemble de données. Maintenant que nous disposons des données stockées dans BigQuery, nous pouvons effectuer une analyse plus approfondie, partager les données avec des collègues et commencer à répondre aux questions commerciales.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Figure 5 : BigQuery
Conclusion
Nous espérons que cet article servira d'exemple utile pour créer un pipeline de données en streaming, ainsi que pour trouver des moyens de rendre les données plus accessibles. Stocker les données dans ce format nous offre de nombreux avantages. Nous pouvons maintenant commencer à répondre à des questions importantes telles que combien de personnes utilisent notre produit ? Votre base d’utilisateurs augmente-t-elle avec le temps ? Avec quels aspects du produit les gens interagissent-ils le plus ? Et y a-t-il des erreurs là où il ne devrait pas y en avoir ? Telles sont les questions qui intéresseront l’organisation. Sur la base des informations qui émergent des réponses à ces questions, nous pouvons améliorer le produit et accroître l'engagement des utilisateurs.
Beam est vraiment utile pour ce type d’exercice et propose également un certain nombre d’autres cas d’utilisation intéressants. Par exemple, vous souhaiterez peut-être analyser les données de cotation boursière en temps réel et effectuer des transactions basées sur l'analyse, peut-être avez-vous des données de capteurs provenant de véhicules et souhaitez-vous calculer des calculs de niveau de trafic. Vous pourriez également, par exemple, être une société de jeux qui collecte les données des utilisateurs et les utilise pour créer des tableaux de bord permettant de suivre les indicateurs clés. D'accord, messieurs, c'est un sujet pour un autre article, merci d'avoir lu, et pour ceux qui veulent voir le code complet, vous trouverez ci-dessous le lien vers mon GitHub.
C'est tout.
Source: habr.com