Здравейте всички. Приятели, споделяме с вас превод на статията, подготвен специално за студентите от курса. . Отивам!

Apache Beam и DataFlow за тръбопроводи в реално време
Днешната публикация се основава на задача, която наскоро реших на работа. Бях много щастлив да го внедря и да опиша свършената работа във формат на блог публикация, защото ми даде възможност да практикувам инженеринг на данни, както и да направя нещо, което би било много полезно за моя екип. Неотдавна открих, че имаме доста голям потребителски регистър, съхранен в нашите системи, свързан с един от нашите продукти за данни. Оказа се, че никой не използва тези данни, така че веднага се заинтересувах какво можем да научим, ако започнем да ги анализираме редовно. По пътя обаче имаше няколко проблема. Първият проблем беше, че данните се съхраняваха в много различни текстови файлове, които не бяха незабавно достъпни за анализ. Вторият проблем беше, че те се съхраняваха в затворена система, така че не можех да използвам нито един от любимите си инструменти за анализ на данните.
Трябваше да разбера как да улесня достъпа ни и да добавя някаква стойност, като вградя този източник на данни в някои от нашите решения за потребителско изживяване. След като помислих за това известно време, реших да проектирам тръбопровод, за да накарам тези данни в облачна база данни, така че екипът и аз да имаме достъп до тях и да започнем да генерираме някои прозрения. След като завърших специализация Data Engineering в Coursera преди известно време, нямах търпение да използвам някои от инструментите от курса в проект.
Така че поставянето на данните в облачна база данни изглеждаше като разумен начин за разрешаване на първия ми проблем, но какво можех да направя за проблем номер 2? За щастие имаше начин да преместя тези данни в среда, където имах достъп до инструменти като Python и Google Cloud Platform (GCP). Това обаче беше дълъг процес, така че трябваше да направя нещо, което да ми позволи да продължа разработката, докато чаках прехвърлянето на данни да приключи. Решението, до което стигнах, беше да създам фалшиви данни с помощта на библиотеката фалшификатор в Python. Никога преди не бях използвал тази библиотека, но бързо осъзнах колко е полезна. Използването на този подход ми позволи да започна да пиша код и да тествам тръбопровода без реални данни.
С това казано, в тази публикация ще споделя как изградих тръбопровода, описан по-горе, използвайки някои от технологиите, налични в GCP. По-конкретно ще използвам Apache Beam (версия на Python), Dataflow, Pub/Sub и Big Query за събиране на потребителски регистрационни файлове, трансформиране на данните и прехвърлянето им в базата данни за допълнителен анализ. В моя случай имах нужда само от груповата функционалност на Beam, тъй като данните ми не идваха в реално време, така че Pub/Sub не беше необходим. Въпреки това ще се придържам към стрийминг версията, тъй като това е, което може да срещнете на практика.
Въведение в GCP и Apache Beam
Google Cloud Platform предоставя набор от наистина полезни инструменти за обработка на големи данни. Ето някои от инструментите, които ще използвам:
- е услуга за съобщения, която използва модел Издател-Абонат, който ни позволява да получаваме данни в реално време.
- е услуга, която улеснява изграждането на канали за данни и автоматично разрешава проблеми като мащабиране на инфраструктурата, което означава, че можем да се съсредоточим само върху писането на кода за нашия канал.
- е облачно хранилище за данни. Ако сте запознати с други SQL бази данни, BigQuery няма да отнеме много време, за да разберете.
- И накрая, ще използваме Apache Beam, като се фокусираме специално върху версията на Python, за да изградим нашия конвейер. Този инструмент ще ни позволи да създадем тръбопровод за поточна или групова обработка, който се интегрира с GCP. Той е особено полезен за паралелна обработка и е подходящ за задачи от типа Extract, Transform and Load (ETL), така че ако трябва да преместим данни от едно място на друго, докато извършваме трансформации или изчисления, Beam е добър избор.
В GCP има голямо разнообразие от инструменти, така че може да е трудно да ги следите всички и каква е тяхната цел, но ето обобщение на тях за справка.
Има голям брой налични инструменти в GCP, така че може да е трудно да ги обхванем всички, включително тяхната цел, но все пак резюме за справка.
Визуализация на нашия тръбопровод
Нека визуализираме компонентите на нашия тръбопровод на фигура 1. На високо ниво искаме да събираме потребителски данни в реално време, да ги обработваме и подаваме в BigQuery. Регистрационни файлове се създават, когато потребителите взаимодействат с продукта, като изпращат заявки до сървъра, които след това се регистрират. Тези данни могат да бъдат особено полезни за разбирането как потребителите взаимодействат с нашия продукт и дали той работи правилно. Като цяло тръбопроводът ще включва следните етапи:
Beam прави този процес много прост, независимо дали имаме стрийминг източник на данни или CSV файл и искаме да извършим групова обработка. По-късно ще видите, че има само минимални промени в кода, необходими за превключване между тях. Това е едно от предимствата на използването на Beam.

Фигура 1: Основен канал за данни: Източник:
Създаване на псевдоданни с Faker
Както споменах по-рано, поради ограничения достъп до данни, реших да създам псевдо данни в същия формат като действителните данни. Това беше наистина полезно упражнение, тъй като можех да пиша код и да тествам тръбопровода, докато чаках данни. Предлагам ви да разгледате Faker, ако искате да видите какво още може да предложи тази библиотека. Нашите потребителски данни ще бъдат като цяло подобни на примера по-долу. Въз основа на този формат можем да генерираме данни ред по ред, за да симулираме данни в реално време. Тези регистрационни файлове ни дават информация като дата, тип заявка, отговор от сървъра, IP адрес и др.
192.52.197.161 - - [30/Apr/2019:21:11:42] "PUT /tag/category/tag HTTP/1.1" [401] 155 "https://harris-lopez.com/categories/about/" "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_11_2) AppleWebKit/5312 (KHTML, like Gecko) Chrome/34.0.855.0 Safari/5312"
Въз основа на горния ред искаме да създадем нашата променлива LINEизползвайки 7-те променливи във фигурни скоби по-долу. Малко по-късно ще ги използваме и като имена на променливи в нашата схема на таблица.
LINE = """
{remote_addr} - - [{time_local}] "{request_type} {request_path} HTTP/1.1" [{status}] {body_bytes_sent} "{http_referer}" "{http_user_agent}"
"""
Ако извършвахме пакетна обработка, кодът щеше да е много подобен, въпреки че ще трябва да създадем набор от проби за определен период от време. За да използваме фалшив, ние просто създаваме обект и извикваме методите, от които се нуждаем. По-специално Faker беше полезен за създаване на IP адреси, както и на уебсайтове. Използвах следните методи:
fake.ipv4()
fake.uri_path()
fake.uri()
fake.user_agent()
from faker import Faker
import time
import random
import os
import numpy as np
from datetime import datetime, timedelta
LINE = """
{remote_addr} - - [{time_local}] "{request_type} {request_path} HTTP/1.1" [{status}] {body_bytes_sent} "{http_referer}" "{http_user_agent}"
"""
def generate_log_line():
fake = Faker()
now = datetime.now()
remote_addr = fake.ipv4()
time_local = now.strftime('%d/%b/%Y:%H:%M:%S')
request_type = random.choice(["GET", "POST", "PUT"])
request_path = "/" + fake.uri_path()
status = np.random.choice([200, 401, 404], p = [0.9, 0.05, 0.05])
body_bytes_sent = random.choice(range(5, 1000, 1))
http_referer = fake.uri()
http_user_agent = fake.user_agent()
log_line = LINE.format(
remote_addr=remote_addr,
time_local=time_local,
request_type=request_type,
request_path=request_path,
status=status,
body_bytes_sent=body_bytes_sent,
http_referer=http_referer,
http_user_agent=http_user_agent
)
return log_lineКрай на първата част.
В следващите дни ще споделим с вас продължението на статията, но сега традиционно очакваме коментари ;-).
Източник: www.habr.com
