Apache Kafka та потокова обробка даних за допомогою Spark Streaming
Привіт, Хабре! Сьогодні ми побудуємо систему, яка за допомогою Spark Streaming оброблятиме потоки повідомлень Apache Kafka і записуватиме результат обробки в хмарну базу даних AWS RDS.
Уявімо, що деяка кредитна організація ставить перед нами завдання обробки транзакцій, що входять, «на льоту» по всіх своїх філіях. Це може бути зроблено з метою оперативного розрахунку відкритої валютою позиції для казначейства, лімітів чи фінансового результату з операцій тощо.
Як реалізувати цей кейс без застосування магії та чарівних заклинань – читаємо під катом! Поїхали!
Безперечно, обробка великого масиву даних у реальному часі надає широкі можливості для використання в сучасних системах. Однією з найпопулярніших комбінацій для цього є тандем Apache Kafka та Spark Streaming, де Kafka створює потік пакетів вхідних повідомлень, а Spark Streaming обробляє ці пакети через заданий інтервал часу.
Для підвищення відмовостійкості програми будемо використовувати контрольні точки - чекпоїнти (checkpoints). За допомогою цього механізму, коли модулю Spark Streaming потрібно буде відновити втрачені дані, йому потрібно буде лише повернутися до останньої контрольної точки та відновити обчислення від неї.
Архітектура системи, що розробляється
Використовувані компоненти:
Апач Кафка - Це розподілена система обміну повідомленнями з публікацією та підпискою. Підходить як для автономного, так і онлайнового споживання повідомлень. Для запобігання втраті даних повідомлення Kafka зберігаються на диску та реплікуються усередині кластера. Система Kafka побудована поверх служби синхронізації ZooKeeper;
Apache Spark Streaming компонент Spark для обробки потокових даних. Модуль Spark Streaming побудований із застосуванням «мікропакетної» архітектури (micro-batch architecture), коли потік даних інтерпретується як безперервна послідовність невеликих пакетів даних. Spark Streaming приймає дані з різних джерел та об'єднує їх у невеликі пакети. Нові пакети створюються через регулярні проміжки часу. На початку кожного інтервалу часу створюється новий пакет, і будь-які дані, що надійшли протягом інтервалу, включаються в пакет. Наприкінці інтервалу збільшення пакета припиняється. Розмір інтервалу визначається параметром, що називається інтервал пакетування (batch interval);
Apache Spark SQL - поєднує реляційну обробку з функціональним програмуванням Spark. Під структурованими даними маються на увазі дані, мають схему, тобто єдиний набір полів всім записів. Spark SQL підтримує введення з безлічі джерел структурованих даних і завдяки наявності інформації про схему, він може ефективно витягувати тільки необхідні поля записів, а також надає API-інтерфейси DataFrame;
AWS RDS — це порівняно недорога хмарна реляційна база даних, веб-сервіс, який спрощує налаштування, експлуатацію та масштабування, адмініструється безпосередньо Amazon.
Встановлення та запуск сервера Kafka
Перед безпосереднім використанням Kafka необхідно переконатися в наявності Java, т.к. для роботи використовується JVM:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Наступний крок – опціональний. Справа в тому, що параметри за замовчуванням не дозволяють повноцінно використовувати всі можливості Apache Kafka. Наприклад, видаляти тему, категорію, групу, на які можуть бути опубліковані повідомлення. Щоб змінити це, відредагуємо конфігураційний файл:
vim ~/kafka/config/server.properties
Додайте до кінця файлу наступне:
delete.topic.enable = true
Перед запуском сервера Kafka необхідно стартувати сервер ZooKeeper, будемо використовувати допоміжний скрипт, який поставляється разом з дистрибутивом Kafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Після того, як ZooKeeper успішно стартував, в окремому терміналі запускаємо сервер Kafka:
Упустимо моменти тестування продюсера та консьюмера для новоствореного топіка. Докладніше про те, як можна протестувати відправлення та прийом повідомлень, написано в офіційній документації. Send some messages. Ну, а ми переходимо до написання продюсера на Python з використанням KafkaProducer API.
Написання продюсера
Продюсер генеруватиме випадкові дані — по 100 повідомлень кожну секунду. Під випадковими даними розумітимемо словник, що складається з трьох полів:
Філія - Найменування точки продажів кредитної організації;
Валюта - Валюта угоди;
сума - Сума угоди. Сума буде позитивною кількістю, якщо це купівля валюти Банком, і негативною — якщо продаж.
Далі, використовуючи метод send, відправляємо повідомлення на сервер, у потрібний нам топік, у форматі JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
При запуску скрипту отримуємо у терміналі такі повідомлення:
Це означає, що все працює, як ми хотіли, — продюсер генерує і відправляє повідомлення в потрібний нам топік.
Наступним кроком буде встановлення Spark та обробка цього потоку повідомлень.
Встановлення Apache Spark
Apache Spark — це універсальна та високопродуктивна кластерна обчислювальна платформа.
За продуктивністю Spark перевершує популярні реалізації моделі MapReduce, попутно забезпечуючи підтримку ширшого діапазону типів обчислень, включаючи інтерактивні запити та потокову обробку. Швидкість відіграє важливу роль при обробці великих обсягів даних, тому що саме швидкість дозволяє працювати в інтерактивному режимі, не витрачаючи хвилин або годин на очікування. Одна з найважливіших переваг Spark, що забезпечують таку високу швидкість, - здатність виконувати обчислення в пам'яті.
Цей фреймворк написаний на Scala, тому необхідно встановити її насамперед:
sudo apt-get install scala
Завантажуємо з офіційного сайту дистрибутив Spark:
Т.к. даний приклад розбирається виключно з освітньою метою, будемо використовувати безкоштовний сервер «на мінімалках» (Free Tier):
Далі, ставимо галочку в блоці Free Tier, і після цього нам автоматично буде запропоновано інстанс класу t2.micro - хоч і слабенький, але безкоштовний і цілком підійде для нашого завдання:
Далі йдуть дуже важливі речі: найменування інстансу БД, ім'я майстер-користувача та його пароль. Назвемо інстанст: myHabrTest, майстер-користувач: хабр, пароль: habr12345 та натискаємо на кнопку Next:
На наступній сторінці знаходяться параметри, що відповідають за доступність нашого сервера БД ззовні (Public accessibility) та доступність портів:
Давайте створимо нове налаштування для VPC security group, яка дозволить ззовні звертатися до нашого сервера БД через порт 5432 (PostgreSQL).
Перейдемо в окремому вікні браузера до консолі AWS у розділі VPC Dashboard -> Security Groups -> Create security group:
Задаємо ім'я для Security group - PostgreSQL, опис, вказуємо до якої VPC ця група має бути асоційована і натискаємо кнопку Create:
Заповнюємо для новоствореної групи Inbound rules для порту 5432, як показано на малюнку нижче. Вручну порт можна не вказувати, а вибрати PostgreSQL з списку Type, що розкривається.
Строго кажучи, значення ::/0 означає доступність вхідного трафіку для сервера з усього світу, що канонічно не зовсім вірно, але для аналізу прикладу дозволимо собі використовувати такий підхід:
Повертаємося до сторінки браузера, де у нас відкрито "Configure advanced settings" і вибираємо у розділі VPC security groups -> Choose existing VPC security groups -> PostgreSQL:
Далі, у розділі Database options -> Database name -> задаємо ім'я - habrDB.
Інші параметри, за винятком хіба що відключення бекапа (backup retention period — 0 days), моніторингу та Performance Insights, можемо залишити за замовчуванням. Натискаємо на кнопку Create database:
Обробник потоків
Завершальним етапом буде розробка Spark-джоби, яка кожні дві секунди оброблятиме нові дані, що прийшли від Kafka і заноситимуть результат до бази даних.
Як було зазначено вище, контрольні точки (сheckpoints) - це основний механізм SparkStreaming, який повинен бути налаштований для забезпечення відмовостійкості. Будемо використовувати контрольні точки і, у разі падіння процедури, модулю Spark Streaming для відновлення втрачених даних потрібно буде лише повернутися до останньої контрольної точки та відновити обчислення від неї.
Контрольну точку можна увімкнути, встановивши каталог у стійкій, надійної файлової системі (наприклад, HDFS, S3 і т. д.), в якій буде збережена інформація контрольної точки. Це робиться за допомогою, наприклад:
streamingContext.checkpoint(checkpointDirectory)
У нашому прикладі будемо використовувати наступний підхід, а саме, якщо checkpointDirectory існує, то контекст буде відтворено з даних контрольної точки. Якщо каталог не існує (тобто виконується вперше), то викликається функція functionToCreateContext для створення нового контексту та налаштування DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Створюємо об'єкт DirectStream з метою підключення до топіка "transaction" за допомогою методу createDirectStream бібліотеки KafkaUtils:
Використовуючи Spark SQL, робимо нескладне угруповання та виводимо результат у консоль:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Отримання тексту запиту та запуск його через Spark SQL:
А потім зберігаємо отримані агреговані дані в таблицю AWS RDS. Щоб зберегти результати агрегації в таблицю бази даних, будемо використовувати метод write об'єкта DataFrame:
Декілька слів про налаштування підключення до AWS RDS. Користувача та пароль до нього ми створювали на етапі «Розгортання AWS PostgreSQL». Як url сервер баз даних слід використовувати Endpoint, який відображається в розділі Connectivity & security:
З метою коректної зв'язки Spark і Kafka слід запускати джобу через smark-submit з використанням артефакту spark-streaming-kafka-0-8_2.11. Додатково застосуємо також артефакт для взаємодії з базою даних PostgreSQL, їх передаватимемо через —packages.
Для гнучкості скрипта, винесемо як вхідні параметри також найменування сервера повідомлень і топік, з якого хочемо отримувати дані.
Отже, настав час запустити та перевірити працездатність системи:
Все вийшло! Як видно на малюнку нижче - в процесі роботи програми нові результати агрегації виводяться кожні 2 секунди, тому що ми встановили інтервал пакетування рівним 2 секунд, коли створювали об'єкт StreamingContext:
Далі, робимо нехитрий запит до бази даних, щоб перевірити наявність записів у таблиці transaction_flow:
Висновок
У цій статті було розглянуто приклад потокової обробки інформації з використанням Spark Streaming у зв'язці з Apache Kafka та PostgreSQL. Зі зростанням обсягів даних із різних джерел, складно переоцінити практичну цінність Spark Streaming для створення потокових додатків та додатків, що діють у масштабі реального часу.
Повний вихідний код ви можете знайти в моєму репозиторії на GitHub.
Із задоволенням готовий обговорити цю статтю, чекаю на Ваші коментарі, а також сподіваюся на конструктивну критику всіх небайдужих читачів.
Бажаю успіхів!
Ps. Спочатку планувалося використовувати локальну БД PostgreSQL, але з огляду на мою любов до AWS я вирішив винести базу даних у хмару. У наступній статті на цю тему я покажу, як реалізувати цілком описану систему в AWS за допомогою AWS Kinesis та AWS EMR. Слідкуйте за новинами!