Herkese selam. Dersin öğrencilerine özel olarak hazırlanan makalenin son kısmının çevirisini sizlerle paylaşıyoruz.
Gerçek Zamanlı İşlem Hatları için Apache Beam ve DataFlow
Google Cloud'u kurma
Not: Python 3'te ardışık düzeni çalıştırmada sorun yaşadığım için ardışık düzeni çalıştırmak ve özel günlük verilerini yayınlamak için Google Cloud Shell'i kullandım. Google Cloud Shell, Apache Beam ile daha tutarlı olan Python 2'yi kullanıyor.
Boru hattını başlatmak için ayarlara biraz girmemiz gerekiyor. Daha önce GCP'yi kullanmamış olanlar için bu kılavuzda özetlenen aşağıdaki 6 adımı uygulamanız gerekecektir.
Bundan sonra komut dosyalarımızı Google Cloud Storage'a yüklememiz ve bunları Google Cloud Shel'imize kopyalamamız gerekecek. Bulut depolamaya yükleme oldukça önemsizdir (bir açıklama bulunabilir
Şekil 2
Dosyaları kopyalamak ve gerekli kütüphaneleri kurmak için ihtiyacımız olan komutlar aşağıda listelenmiştir.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Veritabanımızı ve tablomuzu oluşturuyoruz
Kurulumla ilgili tüm adımları tamamladıktan sonra yapmamız gereken bir sonraki şey BigQuery'de bir veri seti ve tablo oluşturmaktır. Bunu yapmanın birkaç yolu vardır ancak en basiti, önce bir veri kümesi oluşturarak Google Cloud konsolunu kullanmaktır. Aşağıdaki adımları takip edebilirsiniz
Şekil 3. Tablo düzeni
Kullanıcı günlüğü verilerini yayınlama
Pub/Sub, birden fazla bağımsız uygulamanın birbiriyle iletişim kurmasına olanak tanıdığı için işlem hattımızın kritik bir bileşenidir. Özellikle uygulamalar arasında mesaj gönderip almamızı sağlayan bir aracı görevi görüyor. Yapmamız gereken ilk şey konu oluşturmak. Konsolda Pub/Sub'a gidin ve KONU OLUŞTUR'a tıklayın.
Aşağıdaki kod, yukarıda tanımlanan günlük verilerini oluşturmak için komut dosyamızı çağırır ve ardından günlükleri Pub/Sub'a bağlayıp gönderir. Yapmamız gereken tek şey bir nesne yaratmak Yayıncıİstemci, yöntemi kullanarak konunun yolunu belirtin topic_path
ve işlevi çağırın publish
с topic_path
ve veriler. Lütfen ithalat yaptığımızı unutmayın. generate_log_line
senaryomuzdan stream_logs
, bu dosyaların aynı klasörde olduğundan emin olun, aksi takdirde içe aktarma hatası alırsınız. Daha sonra bunu aşağıdakileri kullanarak Google konsolumuz aracılığıyla çalıştırabiliriz:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Dosya çalışır çalışmaz aşağıdaki şekilde gösterildiği gibi log verilerinin konsola çıkışını görebileceğiz. Bu script kullanmadığımız sürece çalışacak CTRL + Ctamamlamak için.
Şekil 4. Çıkış publish_logs.py
Boru hattı kodumuzu yazma
Artık her şeyi hazırladığımıza göre işin eğlenceli kısmına başlayabiliriz: Beam ve Python kullanarak boru hattımızı kodlamak. Beam boru hattı oluşturmak için bir boru hattı nesnesi (p) oluşturmamız gerekir. Bir boru hattı nesnesi oluşturduğumuzda, operatörü kullanarak birden fazla işlevi birbiri ardına uygulayabiliriz. pipe (|)
. Genel olarak iş akışı aşağıdaki resimdeki gibidir.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Kodumuzda iki özel fonksiyon oluşturacağız. İşlev regex_clean
, işlevi kullanarak verileri tarayan ve PATTERNS listesine göre karşılık gelen satırı alan re.search
. İşlev virgülle ayrılmış bir dize döndürür. Düzenli ifade uzmanı değilseniz, buna göz atmanızı öneririm datetime
çalışmasını sağlamak için bir fonksiyonun içinde. Dosyanın başında bir içe aktarma hatası alıyordum, bu garipti. Bu liste daha sonra fonksiyona iletilir WriteToBigQuery, bu sadece verilerimizi tabloya ekler. Batch DataFlow Job ve Streaming DataFlow Job kodları aşağıda verilmiştir. Toplu iş ile akış kodu arasındaki tek fark, toplu olarak CSV'yi şu kaynaktan okumamızdır: src_path
işlevi kullanmak ReadFromText
Beam'den.
Toplu DataFlow İşi (toplu işleme)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
DataFlow İşinin Akışı (akış işleme)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Konveyörün başlatılması
Boru hattını birkaç farklı şekilde çalıştırabiliriz. İsteseydik, GCP'ye uzaktan giriş yaparken bunu bir terminalden yerel olarak çalıştırabilirdik.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Ancak biz DataFlow kullanarak çalıştıracağız. Bunu aşağıdaki komutu kullanarak aşağıdaki gerekli parametreleri ayarlayarak yapabiliriz.
project
— GCP projenizin kimliği.runner
programınızı analiz edecek ve boru hattınızı oluşturacak bir boru hattı çalıştırıcısıdır. Bulutta çalıştırmak için bir DataflowRunner belirtmeniz gerekir.staging_location
— işi gerçekleştiren işlemcilerin ihtiyaç duyduğu kod paketlerinin indekslenmesi için Cloud Dataflow bulut depolama alanına giden yol.temp_location
— işlem hattı çalışırken oluşturulan geçici iş dosyalarını depolamak için Cloud Dataflow bulut depolama alanına giden yol.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Bu komut çalışırken google konsolunda DataFlow sekmesine gidip işlem hattımızı görüntüleyebiliriz. İşlem hattına tıkladığımızda Şekil 4'e benzer bir şey görmeliyiz. Hata ayıklama amacıyla, ayrıntılı günlükleri görüntülemek için Günlükler'e ve ardından Stackdriver'a gitmek çok yararlı olabilir. Bu, bazı durumlarda boru hattı sorunlarını çözmemde bana yardımcı oldu.
Şekil 4: Kiriş konveyörü
BigQuery'deki verilerimize erişme
Bu nedenle, tablomuza veri akışı sağlayan bir boru hattının zaten çalışıyor olması gerekir. Bunu test etmek için BigQuery'ye gidip verilere bakabiliriz. Aşağıdaki komutu kullandıktan sonra veri kümesinin ilk birkaç satırını görmelisiniz. Veriler artık BigQuery'de depolandığına göre daha ayrıntılı analizler yapabilir, verileri meslektaşlarımızla paylaşabilir ve işle ilgili soruları yanıtlamaya başlayabiliriz.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Şekil 5: BigQuery
Sonuç
Bu gönderinin, veri akış hattı oluşturmanın yanı sıra verileri daha erişilebilir hale getirmenin yollarını bulma konusunda yararlı bir örnek olacağını umuyoruz. Verilerin bu formatta saklanması bize birçok avantaj sağlar. Artık ürünümüzü kaç kişi kullanıyor gibi önemli soruları yanıtlamaya başlayabiliriz. Kullanıcı tabanınız zamanla büyüyor mu? İnsanlar ürünün en çok hangi yönleriyle etkileşime giriyor? Ve olmaması gereken hatalar var mı? Bunlar organizasyonun ilgisini çekecek sorulardır. Bu soruların yanıtlarından elde edilen içgörülere dayanarak ürünü iyileştirebilir ve kullanıcı etkileşimini artırabiliriz.
Beam bu tür egzersizler için gerçekten kullanışlıdır ve başka ilginç kullanım durumlarına da sahiptir. Örneğin hisse senedi verilerini gerçek zamanlı analiz edip bu analize dayalı olarak işlem yapmak isteyebilirsiniz, belki elinizde araçlardan gelen sensör verileriniz var ve trafik seviyesi hesaplamaları yapmak istiyorsunuz. Örneğin, kullanıcı verilerini toplayan ve bunları temel ölçümleri izlemek üzere gösterge tabloları oluşturmak için kullanan bir oyun şirketi de olabilirsiniz. Tamam beyler, bu başka bir yazının konusu, okuduğunuz için teşekkürler ve kodun tamamını görmek isteyenler için GitHub'ımın bağlantısı aşağıda.
Hepsi bu.
Kaynak: habr.com