НиС создавамС Ρ†Π΅Π²ΠΊΠΎΠ²ΠΎΠ΄ Π·Π° ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ° Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ Π·Π° ΠΏΠΎΡ‚ΠΎΠΊ. Π”Π΅Π» 2

Π—Π΄Ρ€Π°Π²ΠΎ Π½Π° ситС. Π“ΠΎ сподСлувамС ΠΏΡ€Π΅Π²ΠΎΠ΄ΠΎΡ‚ Π½Π° послСдниот Π΄Π΅Π» ΠΎΠ΄ ΡΡ‚Π°Ρ‚ΠΈΡ˜Π°Ρ‚Π°, ΠΏΠΎΠ΄Π³ΠΎΡ‚Π²Π΅Π½ ΡΠΏΠ΅Ρ†ΠΈΡ˜Π°Π»Π½ΠΎ Π·Π° студСнтитС Π½Π° курсот. Π˜Π½ΠΆΠ΅Π½Π΅Ρ€ Π·Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ. ΠœΠΎΠΆΠ΅Ρ‚Π΅ Π΄Π° Π³ΠΎ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Ρ‚Π΅ ΠΏΡ€Π²ΠΈΠΎΡ‚ Π΄Π΅Π» Ρ‚ΡƒΠΊΠ°.

Apache Beam ΠΈ Data Flow Π·Π° Ρ†Π΅Π²ΠΊΠΎΠ²ΠΎΠ΄ΠΈ Π²ΠΎ Ρ€Π΅Π°Π»Π½ΠΎ Π²Ρ€Π΅ΠΌΠ΅

НиС создавамС Ρ†Π΅Π²ΠΊΠΎΠ²ΠΎΠ΄ Π·Π° ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ° Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ Π·Π° ΠΏΠΎΡ‚ΠΎΠΊ. Π”Π΅Π» 2

ΠŸΠΎΡΡ‚Π°Π²ΡƒΠ²Π°ΡšΠ΅ Π½Π° Google Cloud

Π—Π°Π±Π΅Π»Π΅ΡˆΠΊΠ°: Π“ΠΎ користСв Google Cloud Shell Π·Π° ΠΈΠ·Π²Ρ€ΡˆΡƒΠ²Π°ΡšΠ΅ Π½Π° Π½Π°Ρ„Ρ‚ΠΎΠ²ΠΎΠ΄ΠΎΡ‚ ΠΈ ΠΎΠ±Ρ˜Π°Π²ΡƒΠ²Π°ΡšΠ΅ ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ Π·Π° приспособСни Π΄Π½Π΅Π²Π½ΠΈΡ†ΠΈ бидСјќи ΠΈΠΌΠ°Π² ΠΏΡ€ΠΎΠ±Π»Π΅ΠΌΠΈ со ΠΈΠ·Π²Ρ€ΡˆΡƒΠ²Π°ΡšΠ΅Ρ‚ΠΎ Π½Π° гасоводот Π²ΠΎ Python 3. Google Cloud Shell користи Python 2, ΡˆΡ‚ΠΎ Π΅ поконзистСнтно со Apache Beam.

Π—Π° Π΄Π° Π³ΠΎ Π·Π°ΠΏΠΎΡ‡Π½Π΅ΠΌΠ΅ гасоводот, Ρ‚Ρ€Π΅Π±Π° ΠΌΠ°Π»ΠΊΡƒ Π΄Π° ΠΊΠΎΠΏΠ°ΠΌΠ΅ Π²ΠΎ поставкитС. Π—Π° ΠΎΠ½ΠΈΠ΅ ΠΎΠ΄ вас ΠΊΠΎΠΈ ΠΏΡ€Π΅Ρ‚Ρ…ΠΎΠ΄Π½ΠΎ Π½Π΅ користСлС GCP, ќС Ρ‚Ρ€Π΅Π±Π° Π΄Π° Π³ΠΈ слСдитС слСднитС 6 Ρ‡Π΅ΠΊΠΎΡ€ΠΈ Π½Π°Π²Π΅Π΄Π΅Π½ΠΈ Π²ΠΎ ΠΎΠ²Π° страница.

ПослС ΠΎΠ²Π°, ќС Ρ‚Ρ€Π΅Π±Π° Π΄Π° Π³ΠΈ поставимС Π½Π°ΡˆΠΈΡ‚Π΅ скрипти Π½Π° Google Cloud Storage ΠΈ Π΄Π° Π³ΠΈ ΠΊΠΎΠΏΠΈΡ€Π°ΠΌΠ΅ Π²ΠΎ Π½Π°ΡˆΠΈΠΎΡ‚ Google Cloud Shel. ΠŸΠΎΡΡ‚Π°Π²ΡƒΠ²Π°ΡšΠ΅Ρ‚ΠΎ Π²ΠΎ ΡΠΊΠ»Π°Π΄ΠΈΡ€Π°ΡšΠ΅ ΠΎΠ±Π»Π°ΠΊ Π΅ ΠΏΡ€ΠΈΠ»ΠΈΡ‡Π½ΠΎ Ρ‚Ρ€ΠΈΠ²ΠΈΡ˜Π°Π»Π½ΠΎ (ΠΌΠΎΠΆΠ΅ Π΄Π° сС најдС опис Ρ‚ΡƒΠΊΠ°). Π—Π° Π΄Π° Π³ΠΈ ΠΊΠΎΠΏΠΈΡ€Π°ΠΌΠ΅ Π½Π°ΡˆΠΈΡ‚Π΅ Π΄Π°Ρ‚ΠΎΡ‚Π΅ΠΊΠΈ, ΠΌΠΎΠΆΠ΅ΠΌΠ΅ Π΄Π° Π³ΠΎ ΠΎΡ‚Π²ΠΎΡ€ΠΈΠΌΠ΅ Google Cloud Shel ΠΎΠ΄ Π»Π΅Π½Ρ‚Π°Ρ‚Π° со Π°Π»Π°Ρ‚ΠΊΠΈ со ΠΊΠ»ΠΈΠΊΠ½ΡƒΠ²Π°ΡšΠ΅ Π½Π° ΠΏΡ€Π²Π°Ρ‚Π° ΠΈΠΊΠΎΠ½Π° Π»Π΅Π²ΠΎ Π½Π° Π‘Π»ΠΈΠΊΠ° 2 ΠΏΠΎΠ΄ΠΎΠ»Ρƒ.

НиС создавамС Ρ†Π΅Π²ΠΊΠΎΠ²ΠΎΠ΄ Π·Π° ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ° Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ Π·Π° ΠΏΠΎΡ‚ΠΎΠΊ. Π”Π΅Π» 2
Π‘Π»ΠΈΠΊΠ° 2

ΠšΠΎΠΌΠ°Π½Π΄ΠΈΡ‚Π΅ ΡˆΡ‚ΠΎ Π½ΠΈ сС ΠΏΠΎΡ‚Ρ€Π΅Π±Π½ΠΈ Π·Π° Π΄Π° Π³ΠΈ ΠΊΠΎΠΏΠΈΡ€Π°ΠΌΠ΅ Π΄Π°Ρ‚ΠΎΡ‚Π΅ΠΊΠΈΡ‚Π΅ ΠΈ Π΄Π° Π³ΠΈ инсталирамС ΠΏΠΎΡ‚Ρ€Π΅Π±Π½ΠΈΡ‚Π΅ Π±ΠΈΠ±Π»ΠΈΠΎΡ‚Π΅ΠΊΠΈ сС Π½Π°Π²Π΅Π΄Π΅Π½ΠΈ ΠΏΠΎΠ΄ΠΎΠ»Ρƒ.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

ΠšΡ€Π΅ΠΈΡ€Π°ΡšΠ΅ Π½Π° Π½Π°ΡˆΠ°Ρ‚Π° Π±Π°Π·Π° Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ ΠΈ Ρ‚Π°Π±Π΅Π»Π°

ΠžΡ‚ΠΊΠ°ΠΊΠΎ ќС Π³ΠΈ Π·Π°Π²Ρ€ΡˆΠΈΠΌΠ΅ ситС Ρ‡Π΅ΠΊΠΎΡ€ΠΈ ΠΏΠΎΠ²Ρ€Π·Π°Π½ΠΈ со ΠΏΠΎΡΡ‚Π°Π²ΡƒΠ²Π°ΡšΠ΅Ρ‚ΠΎ, слСдното Π½Π΅ΡˆΡ‚ΠΎ ΡˆΡ‚ΠΎ Ρ‚Ρ€Π΅Π±Π° Π΄Π° Π½Π°ΠΏΡ€Π°Π²ΠΈΠΌΠ΅ Π΅ Π΄Π° создадСмС Π±Π°Π·Π° Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ ΠΈ Ρ‚Π°Π±Π΅Π»Π° Π²ΠΎ BigQuery. ΠŸΠΎΡΡ‚ΠΎΡ˜Π°Ρ‚ Π½Π΅ΠΊΠΎΠ»ΠΊΡƒ Π½Π°Ρ‡ΠΈΠ½ΠΈ Π΄Π° Π³ΠΎ Π½Π°ΠΏΡ€Π°Π²ΠΈΡ‚Π΅ ΠΎΠ²Π°, Π½ΠΎ Π½Π°Ρ˜Π΅Π΄Π½ΠΎΡΡ‚Π°Π²Π½ΠΈΠΎΡ‚ Π΅ Π΄Π° ја користитС ΠΊΠΎΠ½Π·ΠΎΠ»Π°Ρ‚Π° Π½Π° Google Cloud со ΠΏΡ€Π²ΠΎ создавањС Π½Π° Π±Π°Π·Π° Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ. ΠœΠΎΠΆΠ΅Ρ‚Π΅ Π΄Π° Π³ΠΈ слСдитС Ρ‡Π΅ΠΊΠΎΡ€ΠΈΡ‚Π΅ ΠΏΠΎΠ΄ΠΎΠ»Ρƒ Π»ΠΈΠ½ΠΊΠ΄Π° ΠΊΡ€Π΅ΠΈΡ€Π°Ρ‚Π΅ Ρ‚Π°Π±Π΅Π»Π° со шСма. ΠΠ°ΡˆΠ°Ρ‚Π° маса ќС ΠΈΠΌΠ° 7 ΠΊΠΎΠ»ΠΎΠ½ΠΈ, ΡˆΡ‚ΠΎ ΠΎΠ΄Π³ΠΎΠ²Π°Ρ€Π° Π½Π° ΠΊΠΎΠΌΠΏΠΎΠ½Π΅Π½Ρ‚ΠΈΡ‚Π΅ Π½Π° сСкој Π΄Π½Π΅Π²Π½ΠΈΠΊ Π½Π° корисници. Π—Π° погодност, ќС Π³ΠΈ Π΄Π΅Ρ„ΠΈΠ½ΠΈΡ€Π°ΠΌΠ΅ ситС ΠΊΠΎΠ»ΠΎΠ½ΠΈ ΠΊΠ°ΠΊΠΎ Π½ΠΈΠ·ΠΈ, освСн Π·Π° timelocal ΠΏΡ€ΠΎΠΌΠ΅Π½Π»ΠΈΠ²Π°Ρ‚Π°, ΠΈ ќС Π³ΠΈ ΠΈΠΌΠ΅Π½ΡƒΠ²Π°ΠΌΠ΅ спорСд ΠΏΡ€ΠΎΠΌΠ΅Π½Π»ΠΈΠ²ΠΈΡ‚Π΅ ΡˆΡ‚ΠΎ Π³ΠΈ Π³Π΅Π½Π΅Ρ€ΠΈΡ€Π°Π²ΠΌΠ΅ ΠΏΡ€Π΅Ρ‚Ρ…ΠΎΠ΄Π½ΠΎ. РаспорСдот Π½Π° Π½Π°ΡˆΠ°Ρ‚Π° Ρ‚Π°Π±Π΅Π»Π° Ρ‚Ρ€Π΅Π±Π° Π΄Π° ΠΈΠ·Π³Π»Π΅Π΄Π° ΠΊΠ°ΠΊΠΎ Π½Π° Π‘Π»ΠΈΠΊΠ° 3.

НиС создавамС Ρ†Π΅Π²ΠΊΠΎΠ²ΠΎΠ΄ Π·Π° ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ° Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ Π·Π° ΠΏΠΎΡ‚ΠΎΠΊ. Π”Π΅Π» 2
Π‘Π»ΠΈΠΊΠ° 3. РаспорСд Π½Π° Ρ‚Π°Π±Π΅Π»Π°

ΠžΠ±Ρ˜Π°Π²ΡƒΠ²Π°ΡšΠ΅ Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ ΠΎΠ΄ Π΄Π½Π΅Π²Π½ΠΈΠΊΠΎΡ‚ Π½Π° корисникот

Pub/Sub Π΅ ΠΊΡ€ΠΈΡ‚ΠΈΡ‡Π½Π° ΠΊΠΎΠΌΠΏΠΎΠ½Π΅Π½Ρ‚Π° Π½Π° Π½Π°ΡˆΠΈΠΎΡ‚ гасовод бидСјќи Π΄ΠΎΠ·Π²ΠΎΠ»ΡƒΠ²Π° повСќС нСзависни Π°ΠΏΠ»ΠΈΠΊΠ°Ρ†ΠΈΠΈ Π΄Π° ΠΊΠΎΠΌΡƒΠ½ΠΈΡ†ΠΈΡ€Π°Π°Ρ‚ Π΅Π΄Π½ΠΈ со Π΄Ρ€ΡƒΠ³ΠΈ. ΠšΠΎΠ½ΠΊΡ€Π΅Ρ‚Π½ΠΎ, Ρ€Π°Π±ΠΎΡ‚ΠΈ ΠΊΠ°ΠΊΠΎ посрСдник кој Π½ΠΈ ΠΎΠ²ΠΎΠ·ΠΌΠΎΠΆΡƒΠ²Π° Π΄Π° ΠΈΡΠΏΡ€Π°ΡœΠ°ΠΌΠ΅ ΠΈ ΠΏΡ€ΠΈΠΌΠ°ΠΌΠ΅ ΠΏΠΎΡ€Π°ΠΊΠΈ ΠΏΠΎΠΌΠ΅Ρ“Ρƒ Π°ΠΏΠ»ΠΈΠΊΠ°Ρ†ΠΈΠΈΡ‚Π΅. ΠŸΡ€Π²ΠΎΡ‚ΠΎ Π½Π΅ΡˆΡ‚ΠΎ ΡˆΡ‚ΠΎ Ρ‚Ρ€Π΅Π±Π° Π΄Π° Π½Π°ΠΏΡ€Π°Π²ΠΈΠΌΠ΅ Π΅ Π΄Π° создадСмС Ρ‚Π΅ΠΌΠ°. Едноставно ΠΎΠ΄Π΅Ρ‚Π΅ Π²ΠΎ Pub/Sub Π²ΠΎ ΠΊΠΎΠ½Π·ΠΎΠ»Π°Ρ‚Π° ΠΈ ΠΊΠ»ΠΈΠΊΠ½Π΅Ρ‚Π΅ CREATE TOPIC.

ΠšΠΎΠ΄ΠΎΡ‚ ΠΏΠΎΠ΄ΠΎΠ»Ρƒ ја ΠΏΠΎΠ²ΠΈΠΊΡƒΠ²Π° Π½Π°ΡˆΠ°Ρ‚Π° скрипта Π΄Π° Π³ΠΈ Π³Π΅Π½Π΅Ρ€ΠΈΡ€Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈΡ‚Π΅ ΠΎΠ΄ Π΄Π½Π΅Π²Π½ΠΈΠΊΠΎΡ‚ Π΄Π΅Ρ„ΠΈΠ½ΠΈΡ€Π°Π½ΠΈ ΠΏΠΎΠ³ΠΎΡ€Π΅, Π° ΠΏΠΎΡ‚ΠΎΠ° Π³ΠΈ ΠΏΠΎΠ²Ρ€Π·ΡƒΠ²Π° ΠΈ ΠΈΡΠΏΡ€Π°ΡœΠ° Π΄Π½Π΅Π²Π½ΠΈΡ†ΠΈΡ‚Π΅ Π΄ΠΎ Pub/Sub. ЕдинствСното Π½Π΅ΡˆΡ‚ΠΎ ΡˆΡ‚ΠΎ Ρ‚Ρ€Π΅Π±Π° Π΄Π° Π½Π°ΠΏΡ€Π°Π²ΠΈΠΌΠ΅ Π΅ Π΄Π° создадСмС ΠΎΠ±Ρ˜Π΅ΠΊΡ‚ PublisherClient, Π½Π°Π²Π΅Π΄Π΅Ρ‚Π΅ ја ΠΏΠ°Ρ‚Π΅ΠΊΠ°Ρ‚Π° Π΄ΠΎ Ρ‚Π΅ΠΌΠ°Ρ‚Π° ΠΊΠΎΡ€ΠΈΡΡ‚Π΅Ρ˜ΡœΠΈ Π³ΠΎ ΠΌΠ΅Ρ‚ΠΎΠ΄ΠΎΡ‚ topic_path ΠΈ ΠΏΠΎΠ²ΠΈΠΊΠ°Ρ˜Ρ‚Π΅ ја Ρ„ΡƒΠ½ΠΊΡ†ΠΈΡ˜Π°Ρ‚Π° publish с topic_path ΠΈ ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ. Π’Π΅ ΠΌΠΎΠ»ΠΈΠΌΠ΅ ΠΈΠΌΠ°Ρ˜Ρ‚Π΅ ΠΏΡ€Π΅Π΄Π²ΠΈΠ΄ Π΄Π΅ΠΊΠ° Π½ΠΈΠ΅ ΡƒΠ²Π΅Π·ΡƒΠ²Π°ΠΌΠ΅ generate_log_line ΠΎΠ΄ Π½Π°ΡˆΠ΅Ρ‚ΠΎ сцСнарио stream_logs, Π·Π°Ρ‚ΠΎΠ° ΠΏΡ€ΠΎΠ²Π΅Ρ€Π΅Ρ‚Π΅ Π΄Π°Π»ΠΈ ΠΎΠ²ΠΈΠ΅ Π΄Π°Ρ‚ΠΎΡ‚Π΅ΠΊΠΈ сС Π²ΠΎ истата ΠΏΠ°ΠΏΠΊΠ°, ΠΈΠ½Π°ΠΊΡƒ ќС Π΄ΠΎΠ±ΠΈΠ΅Ρ‚Π΅ Π³Ρ€Π΅ΡˆΠΊΠ° ΠΏΡ€ΠΈ ΡƒΠ²ΠΎΠ·. ΠŸΠΎΡ‚ΠΎΠ° ΠΌΠΎΠΆΠ΅ΠΌΠ΅ Π΄Π° Π³ΠΎ ΠΈΠ·Π²Ρ€ΡˆΠΈΠΌΠ΅ ΠΎΠ²Π° ΠΏΡ€Π΅ΠΊΡƒ Π½Π°ΡˆΠ°Ρ‚Π° ΠΊΠΎΠ½Π·ΠΎΠ»Π° Π½Π° Google ΠΊΠΎΡ€ΠΈΡΡ‚Π΅Ρ˜ΡœΠΈ:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Π¨Ρ‚ΠΎΠΌ Π΄Π°Ρ‚ΠΎΡ‚Π΅ΠΊΠ°Ρ‚Π° ќС сС ΠΈΠ·Π²Ρ€ΡˆΠΈ, ќС ΠΌΠΎΠΆΠ΅ΠΌΠ΅ Π΄Π° Π³ΠΎ Π²ΠΈΠ΄ΠΈΠΌΠ΅ ΠΈΠ·Π»Π΅Π·ΠΎΡ‚ ΠΎΠ΄ ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈΡ‚Π΅ ΠΎΠ΄ Π΄Π½Π΅Π²Π½ΠΈΠΊΠΎΡ‚ Π²ΠΎ ΠΊΠΎΠ½Π·ΠΎΠ»Π°Ρ‚Π°, ΠΊΠ°ΠΊΠΎ ΡˆΡ‚ΠΎ Π΅ ΠΏΡ€ΠΈΠΊΠ°ΠΆΠ°Π½ΠΎ Π½Π° сликата ΠΏΠΎΠ΄ΠΎΠ»Ρƒ. Оваа скрипта ќС Ρ€Π°Π±ΠΎΡ‚ΠΈ сС Π΄ΠΎΠ΄Π΅ΠΊΠ° Π½Π΅ ја користимС CTRL + CΠ΄Π° Π³ΠΎ Π΄ΠΎΠ²Ρ€ΡˆΠΈΡˆ.

НиС создавамС Ρ†Π΅Π²ΠΊΠΎΠ²ΠΎΠ΄ Π·Π° ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ° Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ Π·Π° ΠΏΠΎΡ‚ΠΎΠΊ. Π”Π΅Π» 2
Блика 4. ИзлСз publish_logs.py

ΠŸΠΈΡˆΡƒΠ²Π°ΡšΠ΅ Π½Π° Π½Π°ΡˆΠΈΠΎΡ‚ ΠΊΠΎΠ΄ Π·Π° гасоводот

Π‘Π΅Π³Π° ΠΊΠΎΠ³Π° ΠΈΠΌΠ°ΠΌΠ΅ сè ΠΏΠΎΠ΄Π³ΠΎΡ‚Π²Π΅Π½ΠΎ, ΠΌΠΎΠΆΠ΅ΠΌΠ΅ Π΄Π° Π³ΠΎ Π·Π°ΠΏΠΎΡ‡Π½Π΅ΠΌΠ΅ Π·Π°Π±Π°Π²Π½ΠΈΠΎΡ‚ Π΄Π΅Π» - ΠΊΠΎΠ΄ΠΈΡ€Π°ΡšΠ΅ Π½Π° Π½Π°ΡˆΠΈΠΎΡ‚ гасовод ΠΊΠΎΡ€ΠΈΡΡ‚Π΅Ρ˜ΡœΠΈ Beam ΠΈ Python. Π—Π° Π΄Π° создадСмС Beam pipeline, Ρ‚Ρ€Π΅Π±Π° Π΄Π° создадСмС ΠΎΠ±Ρ˜Π΅ΠΊΡ‚ Π½Π° Ρ†Π΅Π²ΠΊΠΎΠ²ΠΎΠ΄ (p). ΠžΡ‚ΠΊΠ°ΠΊΠΎ ќС создадСмС ΠΎΠ±Ρ˜Π΅ΠΊΡ‚ Π½Π° Ρ†Π΅Π²ΠΊΠΎΠ²ΠΎΠ΄, ΠΌΠΎΠΆΠ΅ΠΌΠ΅ Π΄Π° ΠΏΡ€ΠΈΠΌΠ΅Π½ΠΈΠΌΠ΅ повСќС Ρ„ΡƒΠ½ΠΊΡ†ΠΈΠΈ Π΅Π΄Π½Π° ΠΏΠΎ Π΄Ρ€ΡƒΠ³Π° со помош Π½Π° ΠΎΠΏΠ΅Ρ€Π°Ρ‚ΠΎΡ€ΠΎΡ‚ pipe (|). Π’ΠΎ ΠΏΡ€ΠΈΠ½Ρ†ΠΈΠΏ, Ρ€Π°Π±ΠΎΡ‚Π½ΠΈΠΎΡ‚ Ρ‚Π΅ΠΊ ΠΈΠ·Π³Π»Π΅Π΄Π° ΠΊΠ°ΠΊΠΎ сликата ΠΏΠΎΠ΄ΠΎΠ»Ρƒ.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Π’ΠΎ Π½Π°ΡˆΠΈΠΎΡ‚ ΠΊΠΎΠ΄, ќС создадСмС Π΄Π²Π΅ сопствСни Ρ„ΡƒΠ½ΠΊΡ†ΠΈΠΈ. Π€ΡƒΠ½ΠΊΡ†ΠΈΡ˜Π° regex_clean, кој Π³ΠΈ скСнира ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈΡ‚Π΅ ΠΈ Π³ΠΎ Π²Ρ€Π°ΡœΠ° соодвСтниот Ρ€Π΅Π΄ Π²Ρ€Π· основа Π½Π° списокот PATTERNS ΠΊΠΎΡ€ΠΈΡΡ‚Π΅Ρ˜ΡœΠΈ ја Ρ„ΡƒΠ½ΠΊΡ†ΠΈΡ˜Π°Ρ‚Π° re.search. Π€ΡƒΠ½ΠΊΡ†ΠΈΡ˜Π°Ρ‚Π° Π²Ρ€Π°ΡœΠ° Π½ΠΈΠ·Π° ΠΎΠ΄Π²ΠΎΠ΅Π½Π° со Π·Π°ΠΏΠΈΡ€ΠΊΠ°. Ако Π½Π΅ стС СкспСрт Π·Π° Ρ€Π΅Π΄ΠΎΠ²Π½ΠΎ ΠΈΠ·Ρ€Π°Π·ΡƒΠ²Π°ΡšΠ΅, ΠΏΡ€Π΅ΠΏΠΎΡ€Π°Ρ‡ΡƒΠ²Π°ΠΌ Π΄Π° Π³ΠΎ ΠΏΡ€ΠΎΠ²Π΅Ρ€ΠΈΡ‚Π΅ ΠΎΠ²Π° упатство ΠΈ Π²Π΅ΠΆΠ±Π°Ρ˜Ρ‚Π΅ Π²ΠΎ Π±Π΅Π»Π΅ΠΆΠ½ΠΈΠΊ Π΄Π° Π³ΠΎ ΠΏΡ€ΠΎΠ²Π΅Ρ€ΠΈΡ‚Π΅ ΠΊΠΎΠ΄ΠΎΡ‚. По ΠΎΠ²Π° Π΄Π΅Ρ„ΠΈΠ½ΠΈΡ€Π°ΠΌΠ΅ ΠΏΡ€ΠΈΠ»Π°Π³ΠΎΠ΄Π΅Π½Π° Ρ„ΡƒΠ½ΠΊΡ†ΠΈΡ˜Π° ParDo Π½Π°Ρ€Π΅Ρ‡Π΅Π½Π° ПодСли, ΡˆΡ‚ΠΎ Π΅ Π²Π°Ρ€ΠΈΡ˜Π°Ρ†ΠΈΡ˜Π° Π½Π° Beam Ρ‚Ρ€Π°Π½ΡΡ„ΠΎΡ€ΠΌΠ°Ρ†ΠΈΡ˜Π°Ρ‚Π° Π·Π° ΠΏΠ°Ρ€Π°Π»Π΅Π»Π½Π° ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ°. Π’ΠΎ Python, ΠΎΠ²Π° сС ΠΏΡ€Π°Π²ΠΈ Π½Π° посСбСн Π½Π°Ρ‡ΠΈΠ½ - ΠΌΠΎΡ€Π° Π΄Π° создадСмС класа ΡˆΡ‚ΠΎ ќС наслСди ΠΎΠ΄ класата DoFn Beam. Π€ΡƒΠ½ΠΊΡ†ΠΈΡ˜Π°Ρ‚Π° Split Π³ΠΎ Π·Π΅ΠΌΠ° Π°Π½Π°Π»ΠΈΠ·ΠΈΡ€Π°Π½ΠΈΠΎΡ‚ Ρ€Π΅Π΄ ΠΎΠ΄ ΠΏΡ€Π΅Ρ‚Ρ…ΠΎΠ΄Π½Π°Ρ‚Π° Ρ„ΡƒΠ½ΠΊΡ†ΠΈΡ˜Π° ΠΈ Π²Ρ€Π°ΡœΠ° листа Π½Π° Ρ€Π΅Ρ‡Π½ΠΈΡ†ΠΈ со ΠΊΠΎΠΏΡ‡ΠΈΡšΠ° ΡˆΡ‚ΠΎ ΠΎΠ΄Π³ΠΎΠ²Π°Ρ€Π°Π°Ρ‚ Π½Π° ΠΈΠΌΠΈΡšΠ°Ρ‚Π° Π½Π° ΠΊΠΎΠ»ΠΎΠ½ΠΈΡ‚Π΅ Π²ΠΎ Π½Π°ΡˆΠ°Ρ‚Π° Ρ‚Π°Π±Π΅Π»Π° BigQuery. Има Π½Π΅ΡˆΡ‚ΠΎ ΡˆΡ‚ΠΎ Ρ‚Ρ€Π΅Π±Π° Π΄Π° сС Π·Π°Π±Π΅Π»Π΅ΠΆΠΈ Π·Π° ΠΎΠ²Π°Π° Ρ„ΡƒΠ½ΠΊΡ†ΠΈΡ˜Π°: ΠΌΠΎΡ€Π°Π² Π΄Π° ΡƒΠ²Π΅Π·Π°ΠΌ datetime Π²Π½Π°Ρ‚Ρ€Π΅ Π²ΠΎ Ρ„ΡƒΠ½ΠΊΡ†ΠΈΡ˜Π° Π·Π° Π΄Π° Ρ„ΡƒΠ½ΠΊΡ†ΠΈΠΎΠ½ΠΈΡ€Π°. Π”ΠΎΠ±ΠΈΠ² Π³Ρ€Π΅ΡˆΠΊΠ° ΠΏΡ€ΠΈ ΡƒΠ²ΠΎΠ· Π½Π° ΠΏΠΎΡ‡Π΅Ρ‚ΠΎΠΊΠΎΡ‚ Π½Π° Π΄Π°Ρ‚ΠΎΡ‚Π΅ΠΊΠ°Ρ‚Π°, ΡˆΡ‚ΠΎ бСшС Ρ‡ΡƒΠ΄Π½ΠΎ. Оваа листа ΠΏΠΎΡ‚ΠΎΠ° сС прСнСсува Π½Π° Ρ„ΡƒΠ½ΠΊΡ†ΠΈΡ˜Π°Ρ‚Π° WriteToBigQuery, ΡˆΡ‚ΠΎ Сдноставно Π³ΠΈ Π΄ΠΎΠ΄Π°Π²Π° Π½Π°ΡˆΠΈΡ‚Π΅ ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ Π½Π° Ρ‚Π°Π±Π΅Π»Π°Ρ‚Π°. ΠšΠΎΠ΄ΠΎΡ‚ Π·Π° Batch DataFlow Job ΠΈ Streaming DataFlow Job Π΅ Π΄Π°Π΄Π΅Π½ ΠΏΠΎΠ΄ΠΎΠ»Ρƒ. ЕдинствСната Ρ€Π°Π·Π»ΠΈΠΊΠ° ΠΏΠΎΠΌΠ΅Ρ“Ρƒ сСрискиот ΠΈ стриминг ΠΊΠΎΠ΄ΠΎΡ‚ Π΅ Ρ‚ΠΎΠ° ΡˆΡ‚ΠΎ Π²ΠΎ ΡΠ΅Ρ€ΠΈΡ˜Π° Π³ΠΎ Ρ‡ΠΈΡ‚Π°ΠΌΠ΅ CSV ΠΎΠ΄ src_pathΠΊΠΎΡ€ΠΈΡΡ‚Π΅Ρ˜ΡœΠΈ ја Ρ„ΡƒΠ½ΠΊΡ†ΠΈΡ˜Π°Ρ‚Π° ReadFromText ΠΎΠ΄ Π—Ρ€Π°ΠΊ.

Π Π°Π±ΠΎΡ‚Π° Π·Π° сСриски ΠΏΡ€ΠΎΡ‚ΠΎΠΊ Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ (сСриска ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ°)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Π Π°Π±ΠΎΡ‚Π° Π·Π° прСнос Π½Π° ΠΏΡ€ΠΎΡ‚ΠΎΠΊ Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ (ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ° Π½Π° прСнос)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Π‘Ρ‚Π°Ρ€Ρ‚ΡƒΠ²Π°ΡšΠ΅ Π½Π° транспортСрот

МоТСмС Π΄Π° Π³ΠΎ Π²ΠΎΠ΄ΠΈΠΌΠ΅ гасоводот Π½Π° Π½Π΅ΠΊΠΎΠ»ΠΊΡƒ Ρ€Π°Π·Π»ΠΈΡ‡Π½ΠΈ Π½Π°Ρ‡ΠΈΠ½ΠΈ. Ако сакамС, ΠΌΠΎΠΆΠ΅ΠΌΠ΅ само Π΄Π° Π³ΠΎ ΠΈΠ·Π²Ρ€ΡˆΠΈΠΌΠ΅ Π»ΠΎΠΊΠ°Π»Π½ΠΎ ΠΎΠ΄ Ρ‚Π΅Ρ€ΠΌΠΈΠ½Π°Π» Π΄ΠΎΠ΄Π΅ΠΊΠ° сС Π½Π°Ρ˜Π°Π²ΡƒΠ²Π°ΠΌΠ΅ Π½Π° GCP ΠΎΠ΄ Π΄Π°Π»Π΅Ρ‡ΠΈΠ½Π°.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Π‘Π΅ΠΏΠ°ΠΊ, Π½ΠΈΠ΅ ќС Π³ΠΎ ΠΈΠ·Π²Ρ€ΡˆΠΈΠΌΠ΅ ΠΊΠΎΡ€ΠΈΡΡ‚Π΅Ρ˜ΡœΠΈ DataFlow. МоТСмС Π΄Π° Π³ΠΎ Π½Π°ΠΏΡ€Π°Π²ΠΈΠΌΠ΅ Ρ‚ΠΎΠ° ΠΊΠΎΡ€ΠΈΡΡ‚Π΅Ρ˜ΡœΠΈ ја ΠΊΠΎΠΌΠ°Π½Π΄Π°Ρ‚Π° ΠΏΠΎΠ΄ΠΎΠ»Ρƒ со ΠΏΠΎΡΡ‚Π°Π²ΡƒΠ²Π°ΡšΠ΅ Π½Π° слСднитС ΠΏΠΎΡ‚Ρ€Π΅Π±Π½ΠΈ ΠΏΠ°Ρ€Π°ΠΌΠ΅Ρ‚Ρ€ΠΈ.

  • project β€” ID Π½Π° Π²Π°ΡˆΠΈΠΎΡ‚ GCP ΠΏΡ€ΠΎΠ΅ΠΊΡ‚.
  • runner Π΅ Ρ†Π΅Π²ΠΊΠΎΠ²ΠΎΠ΄ ΡˆΡ‚ΠΎ ќС ја Π°Π½Π°Π»ΠΈΠ·ΠΈΡ€Π° Π²Π°ΡˆΠ°Ρ‚Π° ΠΏΡ€ΠΎΠ³Ρ€Π°ΠΌΠ° ΠΈ ќС Π³ΠΎ конструира Π²Π°ΡˆΠΈΠΎΡ‚ гасовод. Π—Π° Π΄Π° Ρ€Π°Π±ΠΎΡ‚ΠΈ Π²ΠΎ ΠΎΠ±Π»Π°ΠΊΠΎΡ‚, ΠΌΠΎΡ€Π° Π΄Π° Π½Π°Π²Π΅Π΄Π΅Ρ‚Π΅ DataflowRunner.
  • staging_location β€” ΠΏΠ°Ρ‚Π΅ΠΊΠ°Ρ‚Π° Π΄ΠΎ ΠΎΠ±Π»Π°ΠΊΠΎΡ‚ Π·Π° ΡΠΊΠ»Π°Π΄ΠΈΡ€Π°ΡšΠ΅ Π½Π° Cloud Dataflow Π·Π° ΠΈΠ½Π΄Π΅ΠΊΡΠΈΡ€Π°ΡšΠ΅ Π½Π° ΠΏΠ°ΠΊΠ΅Ρ‚ΠΈ со ΠΊΠΎΠ΄ΠΎΠ²ΠΈ ΠΏΠΎΡ‚Ρ€Π΅Π±Π½ΠΈ Π½Π° процСсоритС ΡˆΡ‚ΠΎ ја Π²Ρ€ΡˆΠ°Ρ‚ Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚Π°.
  • temp_location β€” ΠΏΠ°Ρ‚Π΅ΠΊΠ° Π΄ΠΎ ΡΠΊΠ»Π°Π΄ΠΈΡˆΡ‚Π΅Ρ‚ΠΎ Π²ΠΎ ΠΎΠ±Π»Π°ΠΊ Π½Π° Cloud Dataflow Π·Π° ΡΠΊΠ»Π°Π΄ΠΈΡ€Π°ΡšΠ΅ Π½Π° ΠΏΡ€ΠΈΠ²Ρ€Π΅ΠΌΠ΅Π½ΠΈ Π΄Π°Ρ‚ΠΎΡ‚Π΅ΠΊΠΈ Π·Π° Ρ€Π°Π±ΠΎΡ‚Π½ΠΈ Π·Π°Π΄Π°Ρ‡ΠΈ создадСни Π΄ΠΎΠ΄Π΅ΠΊΠ° Ρ€Π°Π±ΠΎΡ‚ΠΈ Π½Π°Ρ„Ρ‚ΠΎΠ²ΠΎΠ΄ΠΎΡ‚.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Π”ΠΎΠ΄Π΅ΠΊΠ° ΠΎΠ²Π°Π° ΠΊΠΎΠΌΠ°Π½Π΄Π° Ρ€Π°Π±ΠΎΡ‚ΠΈ, ΠΌΠΎΠΆΠ΅ΠΌΠ΅ Π΄Π° ΠΎΠ΄ΠΈΠΌΠ΅ Π²ΠΎ Ρ‚Π°Π±ΡƒΠ»Π°Ρ‚ΠΎΡ€ΠΎΡ‚ DataFlow Π²ΠΎ ΠΊΠΎΠ½Π·ΠΎΠ»Π°Ρ‚Π° Π½Π° Google ΠΈ Π΄Π° ја Π²ΠΈΠ΄ΠΈΠΌΠ΅ Π½Π°ΡˆΠ°Ρ‚Π° линија. Кога ΠΊΠ»ΠΈΠΊΠ½ΡƒΠ²Π°ΠΌΠ΅ Π½Π° Ρ†Π΅Π²ΠΊΠΎΠ²ΠΎΠ΄ΠΎΡ‚, Ρ‚Ρ€Π΅Π±Π° Π΄Π° Π²ΠΈΠ΄ΠΈΠΌΠ΅ Π½Π΅ΡˆΡ‚ΠΎ слично Π½Π° Π‘Π»ΠΈΠΊΠ° 4. Π—Π° Ρ†Π΅Π»ΠΈΡ‚Π΅ Π½Π° Π΄Π΅Π±Π°Π³ΠΈΡ€Π°ΡšΠ΅, ΠΌΠΎΠΆΠ΅ Π΄Π° Π±ΠΈΠ΄Π΅ ΠΌΠ½ΠΎΠ³Ρƒ корисно Π΄Π° ΠΎΡ‚ΠΈΠ΄Π΅Ρ‚Π΅ Π²ΠΎ Logs, Π° ΠΏΠΎΡ‚ΠΎΠ° Π²ΠΎ Stackdriver Π·Π° Π΄Π° Π³ΠΈ Π²ΠΈΠ΄ΠΈΡ‚Π΅ Π΄Π΅Ρ‚Π°Π»Π½ΠΈΡ‚Π΅ Π΄Π½Π΅Π²Π½ΠΈΡ†ΠΈ. Ова ΠΌΠΈ ΠΏΠΎΠΌΠΎΠ³Π½Π° Π΄Π° Π³ΠΈ Ρ€Π΅ΡˆΠ°ΠΌ ΠΏΡ€ΠΎΠ±Π»Π΅ΠΌΠΈΡ‚Π΅ со гасоводот Π²ΠΎ Π³ΠΎΠ»Π΅ΠΌ Π±Ρ€ΠΎΡ˜ случаи.

НиС создавамС Ρ†Π΅Π²ΠΊΠΎΠ²ΠΎΠ΄ Π·Π° ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ° Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ Π·Π° ΠΏΠΎΡ‚ΠΎΠΊ. Π”Π΅Π» 2
Π‘Π»ΠΈΠΊΠ° 4: ВранспортСр со Π·Ρ€Π°ΠΊ

ΠŸΡ€ΠΈΡΡ‚Π°ΠΏΠ΅Ρ‚Π΅ Π΄ΠΎ Π½Π°ΡˆΠΈΡ‚Π΅ ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ Π²ΠΎ BigQuery

Π—Π½Π°Ρ‡ΠΈ, вСќС Ρ‚Ρ€Π΅Π±Π° Π΄Π° ΠΈΠΌΠ°ΠΌΠ΅ Ρ†Π΅Π²ΠΊΠΎΠ²ΠΎΠ΄ ΡˆΡ‚ΠΎ Ρ€Π°Π±ΠΎΡ‚ΠΈ со ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ ΡˆΡ‚ΠΎ сС Π²Π»Π΅Π²Π°Π°Ρ‚ Π²ΠΎ Π½Π°ΡˆΠ°Ρ‚Π° Ρ‚Π°Π±Π΅Π»Π°. Π—Π° Π΄Π° Π³ΠΎ тСстирамС ΠΎΠ²Π°, ΠΌΠΎΠΆΠ΅ΠΌΠ΅ Π΄Π° ΠΎΠ΄ΠΈΠΌΠ΅ Π½Π° BigQuery ΠΈ Π΄Π° Π³ΠΈ ΠΏΠΎΠ³Π»Π΅Π΄Π½Π΅ΠΌΠ΅ ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈΡ‚Π΅. ΠžΡ‚ΠΊΠ°ΠΊΠΎ ќС ја користитС ΠΊΠΎΠΌΠ°Π½Π΄Π°Ρ‚Π° ΠΏΠΎΠ΄ΠΎΠ»Ρƒ, Ρ‚Ρ€Π΅Π±Π° Π΄Π° Π³ΠΈ Π²ΠΈΠ΄ΠΈΡ‚Π΅ ΠΏΡ€Π²ΠΈΡ‚Π΅ Π½Π΅ΠΊΠΎΠ»ΠΊΡƒ Ρ€Π΅Π΄ΠΎΠ²ΠΈ ΠΎΠ΄ сСтот. Π‘Π΅Π³Π° ΠΊΠΎΠ³Π° Π³ΠΈ ΠΈΠΌΠ°ΠΌΠ΅ ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈΡ‚Π΅ Π·Π°Ρ‡ΡƒΠ²Π°Π½ΠΈ Π²ΠΎ BigQuery, ΠΌΠΎΠΆΠ΅ΠΌΠ΅ Π΄Π° спровСдСмС Π΄ΠΎΠΏΠΎΠ»Π½ΠΈΡ‚Π΅Π»Π½Π° Π°Π½Π°Π»ΠΈΠ·Π°, ΠΊΠ°ΠΊΠΎ ΠΈ Π΄Π° Π³ΠΈ сподСлимС ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈΡ‚Π΅ со ΠΊΠΎΠ»Π΅Π³ΠΈΡ‚Π΅ ΠΈ Π΄Π° ΠΏΠΎΡ‡Π½Π΅ΠΌΠ΅ Π΄Π° ΠΎΠ΄Π³ΠΎΠ²Π°Ρ€Π°ΠΌΠ΅ Π½Π° Π΄Π΅Π»ΠΎΠ²Π½ΠΈ ΠΏΡ€Π°ΡˆΠ°ΡšΠ°.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

НиС создавамС Ρ†Π΅Π²ΠΊΠΎΠ²ΠΎΠ΄ Π·Π° ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ° Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ Π·Π° ΠΏΠΎΡ‚ΠΎΠΊ. Π”Π΅Π» 2
Π‘Π»ΠΈΠΊΠ° 5: BigQuery

Π—Π°ΠΊΠ»ΡƒΡ‡ΠΎΠΊ

Π‘Π΅ Π½Π°Π΄Π΅Π²Π°ΠΌΠ΅ Π΄Π΅ΠΊΠ° ΠΎΠ²Π°Π° објава ќС послуТи ΠΊΠ°ΠΊΠΎ корисСн ΠΏΡ€ΠΈΠΌΠ΅Ρ€ Π·Π° создавањС Π½Π° ΠΏΡ€ΠΎΡ‚ΠΎΠΊ Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ Π·Π° стриминг, ΠΊΠ°ΠΊΠΎ ΠΈ Π·Π° ΠΈΠ·Π½Π°ΠΎΡ“Π°ΡšΠ΅ Π½Π°Ρ‡ΠΈΠ½ΠΈ Π΄Π° Π³ΠΈ Π½Π°ΠΏΡ€Π°Π²ΠΈΠΌΠ΅ ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈΡ‚Π΅ подостапни. Π‘ΠΊΠ»Π°Π΄ΠΈΡ€Π°ΡšΠ΅Ρ‚ΠΎ ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ Π²ΠΎ овој Ρ„ΠΎΡ€ΠΌΠ°Ρ‚ Π½ΠΈ Π΄Π°Π²Π° ΠΌΠ½ΠΎΠ³Ρƒ прСдности. Π‘Π΅Π³Π° ΠΌΠΎΠΆΠ΅ΠΌΠ΅ Π΄Π° ΠΏΠΎΡ‡Π½Π΅ΠΌΠ΅ Π΄Π° ΠΎΠ΄Π³ΠΎΠ²Π°Ρ€Π°ΠΌΠ΅ Π½Π° Π²Π°ΠΆΠ½ΠΈ ΠΏΡ€Π°ΡˆΠ°ΡšΠ° ΠΊΠ°ΠΊΠΎ Π½Π° ΠΏΡ€ΠΈΠΌΠ΅Ρ€ ΠΊΠΎΠ»ΠΊΡƒ Π»ΡƒΡ“Π΅ Π³ΠΎ користат Π½Π°ΡˆΠΈΠΎΡ‚ ΠΏΡ€ΠΎΠΈΠ·Π²ΠΎΠ΄? Π”Π°Π»ΠΈ Π²Π°ΡˆΠ°Ρ‚Π° корисничка Π±Π°Π·Π° растС со Ρ‚Π΅ΠΊΠΎΡ‚ Π½Π° Π²Ρ€Π΅ΠΌΠ΅Ρ‚ΠΎ? Π‘ΠΎ ΠΊΠΎΠΈ аспСкти Π½Π° ΠΏΡ€ΠΎΠΈΠ·Π²ΠΎΠ΄ΠΎΡ‚ Π»ΡƒΡ“Π΅Ρ‚ΠΎ Π½Π°Ρ˜ΠΌΠ½ΠΎΠ³Ρƒ ΠΊΠΎΠΌΡƒΠ½ΠΈΡ†ΠΈΡ€Π°Π°Ρ‚? И Π΄Π°Π»ΠΈ ΠΈΠΌΠ° Π³Ρ€Π΅ΡˆΠΊΠΈ Ρ‚Π°ΠΌΡƒ ΠΊΠ°Π΄Π΅ ΡˆΡ‚ΠΎ Π½Π΅ Ρ‚Ρ€Π΅Π±Π° Π΄Π° ΠΈΠΌΠ°? Ова сС ΠΏΡ€Π°ΡˆΠ°ΡšΠ°Ρ‚Π° ΠΊΠΎΠΈ ќС Π±ΠΈΠ΄Π°Ρ‚ ΠΎΠ΄ интСрСс Π·Π° ΠΎΡ€Π³Π°Π½ΠΈΠ·Π°Ρ†ΠΈΡ˜Π°Ρ‚Π°. Π’Ρ€Π· основа Π½Π° ΡƒΠ²ΠΈΠ΄ΠΈΡ‚Π΅ ΡˆΡ‚ΠΎ ΠΏΡ€ΠΎΠΈΠ·Π»Π΅Π³ΡƒΠ²Π°Π°Ρ‚ ΠΎΠ΄ ΠΎΠ΄Π³ΠΎΠ²ΠΎΡ€ΠΈΡ‚Π΅ Π½Π° ΠΎΠ²ΠΈΠ΅ ΠΏΡ€Π°ΡˆΠ°ΡšΠ°, ΠΌΠΎΠΆΠ΅ΠΌΠ΅ Π΄Π° Π³ΠΎ ΠΏΠΎΠ΄ΠΎΠ±Ρ€ΠΈΠΌΠ΅ ΠΏΡ€ΠΎΠΈΠ·Π²ΠΎΠ΄ΠΎΡ‚ ΠΈ Π΄Π° Π³ΠΎ Π·Π³ΠΎΠ»Π΅ΠΌΠΈΠΌΠ΅ Π°Π½Π³Π°ΠΆΠΌΠ°Π½ΠΎΡ‚ Π½Π° корисницитС.

Beam Π΅ навистина корисСн Π·Π° овој Ρ‚ΠΈΠΏ Π½Π° вСТбањС, Π° ΠΈΠΌΠ° ΠΈ Π³ΠΎΠ»Π΅ΠΌ Π±Ρ€ΠΎΡ˜ Π΄Ρ€ΡƒΠ³ΠΈ интСрСсни случаи Π½Π° ΡƒΠΏΠΎΡ‚Ρ€Π΅Π±Π°. На ΠΏΡ€ΠΈΠΌΠ΅Ρ€, ΠΌΠΎΠΆΠ΅Π±ΠΈ ќС сакатС Π΄Π° Π³ΠΈ Π°Π½Π°Π»ΠΈΠ·ΠΈΡ€Π°Ρ‚Π΅ ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈΡ‚Π΅ ΠΎΠ΄ ΠΊΡ€Π»Π΅ΠΆΠΈΡ‚Π΅ Π·Π° Π°ΠΊΡ†ΠΈΠΈ Π²ΠΎ Ρ€Π΅Π°Π»Π½ΠΎ Π²Ρ€Π΅ΠΌΠ΅ ΠΈ Π΄Π° ΠΏΡ€Π°Π²ΠΈΡ‚Π΅ Ρ‚Ρ€Π³ΡƒΠ²Π°ΡšΠ΅ Π²Ρ€Π· основа Π½Π° Π°Π½Π°Π»ΠΈΠ·Π°Ρ‚Π°, ΠΌΠΎΠΆΠ΅Π±ΠΈ ΠΈΠΌΠ°Ρ‚Π΅ ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ ΠΎΠ΄ сСнзоритС ΠΊΠΎΠΈ Π΄ΠΎΠ°Ρ“Π°Π°Ρ‚ ΠΎΠ΄ Π²ΠΎΠ·ΠΈΠ»Π°Ρ‚Π° ΠΈ сакатС Π΄Π° Π³ΠΈ прСсмСтатС прСсмСткитС Π½Π° Π½ΠΈΠ²ΠΎΡ‚ΠΎ Π½Π° ΡΠΎΠΎΠ±Ρ€Π°ΡœΠ°Ρ˜. ΠœΠΎΠΆΠ΅Ρ‚Π΅ исто Ρ‚Π°ΠΊΠ°, Π½Π° ΠΏΡ€ΠΈΠΌΠ΅Ρ€, Π΄Π° Π±ΠΈΠ΄Π΅Ρ‚Π΅ компанија Π·Π° ΠΈΠ³Ρ€ΠΈ ΡˆΡ‚ΠΎ собира кориснички ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ ΠΈ Π³ΠΈ користи Π·Π° создавањС ΠΊΠΎΠ½Ρ‚Ρ€ΠΎΠ»Π½ΠΈ Ρ‚Π°Π±Π»ΠΈ Π·Π° слСдСњС Π½Π° ΠΊΠ»ΡƒΡ‡Π½ΠΈΡ‚Π΅ ΠΌΠ΅Ρ‚Ρ€ΠΈΠΊΠΈ. Π”ΠΎΠ±Ρ€ΠΎ, господа, ΠΎΠ²Π° Π΅ Ρ‚Π΅ΠΌΠ° Π·Π° ΡƒΡˆΡ‚Π΅ Π΅Π΄Π΅Π½ пост, Π±Π»Π°Π³ΠΎΠ΄Π°Ρ€Π°ΠΌ ΡˆΡ‚ΠΎ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π²Ρ‚Π΅, Π° Π·Π° ΠΎΠ½ΠΈΠ΅ ΠΊΠΎΠΈ сакаат Π΄Π° Π³ΠΎ Π²ΠΈΠ΄Π°Ρ‚ цСлосниот ΠΊΠΎΠ΄, ΠΏΠΎΠ΄ΠΎΠ»Ρƒ Π΅ Π»ΠΈΠ½ΠΊΠΎΡ‚ Π΄ΠΎ ΠΌΠΎΡ˜ΠΎΡ‚ GitHub.

https://github.com/DFoly/User_log_pipeline

Π’ΠΎΠ° Π΅ сè. ΠŸΡ€ΠΎΡ‡ΠΈΡ‚Π°Ρ˜Ρ‚Π΅ Π³ΠΎ ΠΏΡ€Π²ΠΈΠΎΡ‚ Π΄Π΅Π».

Π˜Π·Π²ΠΎΡ€: www.habr.com

Π”ΠΎΠ΄Π°Π΄Π΅Ρ‚Π΅ ΠΊΠΎΠΌΠ΅Π½Ρ‚Π°Ρ€