Bir akış veri işleme hattı oluşturuyoruz. Bölüm 2

Herkese selam. Dersin öğrencilerine özel olarak hazırlanan makalenin son kısmının çevirisini sizlerle paylaşıyoruz. Veri Mühendisi. İlk bölümü okuyabilirsiniz burada.

Gerçek Zamanlı İşlem Hatları için Apache Beam ve DataFlow

Bir akış veri işleme hattı oluşturuyoruz. Bölüm 2

Google Cloud'u kurma

Not: Python 3'te ardışık düzeni çalıştırmada sorun yaşadığım için ardışık düzeni çalıştırmak ve özel günlük verilerini yayınlamak için Google Cloud Shell'i kullandım. Google Cloud Shell, Apache Beam ile daha tutarlı olan Python 2'yi kullanıyor.

Boru hattını başlatmak için ayarlara biraz girmemiz gerekiyor. Daha önce GCP'yi kullanmamış olanlar için bu kılavuzda özetlenen aşağıdaki 6 adımı uygulamanız gerekecektir. sayfa.

Bundan sonra komut dosyalarımızı Google Cloud Storage'a yüklememiz ve bunları Google Cloud Shel'imize kopyalamamız gerekecek. Bulut depolamaya yükleme oldukça önemsizdir (bir açıklama bulunabilir burada). Dosyalarımızı kopyalamak için aşağıdaki Şekil 2'de soldaki ilk simgeye tıklayarak araç çubuğundan Google Cloud Shel'i açabiliriz.

Bir akış veri işleme hattı oluşturuyoruz. Bölüm 2
Şekil 2

Dosyaları kopyalamak ve gerekli kütüphaneleri kurmak için ihtiyacımız olan komutlar aşağıda listelenmiştir.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Veritabanımızı ve tablomuzu oluşturuyoruz

Kurulumla ilgili tüm adımları tamamladıktan sonra yapmamız gereken bir sonraki şey BigQuery'de bir veri seti ve tablo oluşturmaktır. Bunu yapmanın birkaç yolu vardır ancak en basiti, önce bir veri kümesi oluşturarak Google Cloud konsolunu kullanmaktır. Aşağıdaki adımları takip edebilirsiniz bağlantışema içeren bir tablo oluşturmak için. Masamız olacak 7 sütun, her kullanıcı günlüğünün bileşenlerine karşılık gelir. Kolaylık sağlamak için, timelocal değişkeni dışındaki tüm sütunları dize olarak tanımlayacağız ve bunları daha önce oluşturduğumuz değişkenlere göre adlandıracağız. Tablomuzun düzeni Şekil 3'teki gibi görünmelidir.

Bir akış veri işleme hattı oluşturuyoruz. Bölüm 2
Şekil 3. Tablo düzeni

Kullanıcı günlüğü verilerini yayınlama

Pub/Sub, birden fazla bağımsız uygulamanın birbiriyle iletişim kurmasına olanak tanıdığı için işlem hattımızın kritik bir bileşenidir. Özellikle uygulamalar arasında mesaj gönderip almamızı sağlayan bir aracı görevi görüyor. Yapmamız gereken ilk şey konu oluşturmak. Konsolda Pub/Sub'a gidin ve KONU OLUŞTUR'a tıklayın.

Aşağıdaki kod, yukarıda tanımlanan günlük verilerini oluşturmak için komut dosyamızı çağırır ve ardından günlükleri Pub/Sub'a bağlayıp gönderir. Yapmamız gereken tek şey bir nesne yaratmak Yayıncıİstemci, yöntemi kullanarak konunun yolunu belirtin topic_path ve işlevi çağırın publish с topic_path ve veriler. Lütfen ithalat yaptığımızı unutmayın. generate_log_line senaryomuzdan stream_logs, bu dosyaların aynı klasörde olduğundan emin olun, aksi takdirde içe aktarma hatası alırsınız. Daha sonra bunu aşağıdakileri kullanarak Google konsolumuz aracılığıyla çalıştırabiliriz:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Dosya çalışır çalışmaz aşağıdaki şekilde gösterildiği gibi log verilerinin konsola çıkışını görebileceğiz. Bu script kullanmadığımız sürece çalışacak CTRL + Ctamamlamak için.

Bir akış veri işleme hattı oluşturuyoruz. Bölüm 2
Şekil 4. Çıkış publish_logs.py

Boru hattı kodumuzu yazma

Artık her şeyi hazırladığımıza göre işin eğlenceli kısmına başlayabiliriz: Beam ve Python kullanarak boru hattımızı kodlamak. Beam boru hattı oluşturmak için bir boru hattı nesnesi (p) oluşturmamız gerekir. Bir boru hattı nesnesi oluşturduğumuzda, operatörü kullanarak birden fazla işlevi birbiri ardına uygulayabiliriz. pipe (|). Genel olarak iş akışı aşağıdaki resimdeki gibidir.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Kodumuzda iki özel fonksiyon oluşturacağız. İşlev regex_clean, işlevi kullanarak verileri tarayan ve PATTERNS listesine göre karşılık gelen satırı alan re.search. İşlev virgülle ayrılmış bir dize döndürür. Düzenli ifade uzmanı değilseniz, buna göz atmanızı öneririm öğretici ve kodu kontrol etmek için bir not defterinde pratik yapın. Bundan sonra adı verilen özel bir ParDo işlevi tanımlarız. Bölünmüşparalel işleme için Işın dönüşümünün bir çeşididir. Python'da bu özel bir şekilde yapılır; DoFn Beam sınıfından miras alan bir sınıf yaratmalıyız. Split işlevi, önceki işlevden ayrıştırılan satırı alır ve BigQuery tablomuzdaki sütun adlarına karşılık gelen tuşların bulunduğu bir sözlük listesi döndürür. Bu işlevle ilgili dikkat edilmesi gereken bir şey var: İçe aktarmam gerekiyordu datetime çalışmasını sağlamak için bir fonksiyonun içinde. Dosyanın başında bir içe aktarma hatası alıyordum, bu garipti. Bu liste daha sonra fonksiyona iletilir WriteToBigQuery, bu sadece verilerimizi tabloya ekler. Batch DataFlow Job ve Streaming DataFlow Job kodları aşağıda verilmiştir. Toplu iş ile akış kodu arasındaki tek fark, toplu olarak CSV'yi şu kaynaktan okumamızdır: src_pathişlevi kullanmak ReadFromText Beam'den.

Toplu DataFlow İşi (toplu işleme)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

DataFlow İşinin Akışı (akış işleme)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Konveyörün başlatılması

Boru hattını birkaç farklı şekilde çalıştırabiliriz. İsteseydik, GCP'ye uzaktan giriş yaparken bunu bir terminalden yerel olarak çalıştırabilirdik.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Ancak biz DataFlow kullanarak çalıştıracağız. Bunu aşağıdaki komutu kullanarak aşağıdaki gerekli parametreleri ayarlayarak yapabiliriz.

  • project — GCP projenizin kimliği.
  • runner programınızı analiz edecek ve boru hattınızı oluşturacak bir boru hattı çalıştırıcısıdır. Bulutta çalıştırmak için bir DataflowRunner belirtmeniz gerekir.
  • staging_location — işi gerçekleştiren işlemcilerin ihtiyaç duyduğu kod paketlerinin indekslenmesi için Cloud Dataflow bulut depolama alanına giden yol.
  • temp_location — işlem hattı çalışırken oluşturulan geçici iş dosyalarını depolamak için Cloud Dataflow bulut depolama alanına giden yol.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Bu komut çalışırken google konsolunda DataFlow sekmesine gidip işlem hattımızı görüntüleyebiliriz. İşlem hattına tıkladığımızda Şekil 4'e benzer bir şey görmeliyiz. Hata ayıklama amacıyla, ayrıntılı günlükleri görüntülemek için Günlükler'e ve ardından Stackdriver'a gitmek çok yararlı olabilir. Bu, bazı durumlarda boru hattı sorunlarını çözmemde bana yardımcı oldu.

Bir akış veri işleme hattı oluşturuyoruz. Bölüm 2
Şekil 4: Kiriş konveyörü

BigQuery'deki verilerimize erişme

Bu nedenle, tablomuza veri akışı sağlayan bir boru hattının zaten çalışıyor olması gerekir. Bunu test etmek için BigQuery'ye gidip verilere bakabiliriz. Aşağıdaki komutu kullandıktan sonra veri kümesinin ilk birkaç satırını görmelisiniz. Veriler artık BigQuery'de depolandığına göre daha ayrıntılı analizler yapabilir, verileri meslektaşlarımızla paylaşabilir ve işle ilgili soruları yanıtlamaya başlayabiliriz.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Bir akış veri işleme hattı oluşturuyoruz. Bölüm 2
Şekil 5: BigQuery

Sonuç

Bu gönderinin, veri akış hattı oluşturmanın yanı sıra verileri daha erişilebilir hale getirmenin yollarını bulma konusunda yararlı bir örnek olacağını umuyoruz. Verilerin bu formatta saklanması bize birçok avantaj sağlar. Artık ürünümüzü kaç kişi kullanıyor gibi önemli soruları yanıtlamaya başlayabiliriz. Kullanıcı tabanınız zamanla büyüyor mu? İnsanlar ürünün en çok hangi yönleriyle etkileşime giriyor? Ve olmaması gereken hatalar var mı? Bunlar organizasyonun ilgisini çekecek sorulardır. Bu soruların yanıtlarından elde edilen içgörülere dayanarak ürünü iyileştirebilir ve kullanıcı etkileşimini artırabiliriz.

Beam bu tür egzersizler için gerçekten kullanışlıdır ve başka ilginç kullanım durumlarına da sahiptir. Örneğin hisse senedi verilerini gerçek zamanlı analiz edip bu analize dayalı olarak işlem yapmak isteyebilirsiniz, belki elinizde araçlardan gelen sensör verileriniz var ve trafik seviyesi hesaplamaları yapmak istiyorsunuz. Örneğin, kullanıcı verilerini toplayan ve bunları temel ölçümleri izlemek üzere gösterge tabloları oluşturmak için kullanan bir oyun şirketi de olabilirsiniz. Tamam beyler, bu başka bir yazının konusu, okuduğunuz için teşekkürler ve kodun tamamını görmek isteyenler için GitHub'ımın bağlantısı aşağıda.

https://github.com/DFoly/User_log_pipeline

Hepsi bu. Birinci bölümü okuyun.

Kaynak: habr.com

Yorum ekle