Не тільки обробка: Як ми зробили з Kafka Streams розподілену базу даних, і що з цього вийшло

Привіт, Хабре!

Нагадуємо, що слідом за книгою про Кафка ми випустили не менш цікаву працю про бібліотеку Kafka Streams API.

Не тільки обробка: Як ми зробили з Kafka Streams розподілену базу даних, і що з цього вийшло

Поки що спільнота тільки осягає межі можливостей цього потужного інструменту. Так, нещодавно вийшла стаття, з перекладом якої ми хочемо познайомити вас. На власному досвіді автор розповідає, як зробити з Kafka Streams розподілене сховище даних. Приємного читання!

Бібліотека Apache Потоки Кафки по всьому світу використовується в ентерпрайзі для розподіленої потокової обробки поверх Apache Kafka. Один з недооцінених аспектів цього фреймворку полягає в тому, що він дозволяє зберігати локальний стан, вироблений на основі потокової обробки.

У цій статті я розповім, як у нашій компанії вдалося вигідно задіяти цю можливість при розробці продукту безпеки хмарних додатків. За допомогою Kafka Streams ми створили мікросервіси з станом, що розділяється, кожен з яких служить нам відмовостійким і високодоступним джерелом достовірної інформації про стан об'єктів в системі. Для нас це крок вперед як щодо надійності, так і зручності підтримки.

Якщо вас цікавить альтернативний підхід, що дає змогу використовувати єдину центральну базу даних для підтримки формального стану ваших об'єктів – почитайте, буде цікаво…

Чому ми вважали, що настав час змінювати наші підходи до роботи з станом, що розділяється.

Нам потрібно підтримувати стан різних об'єктів, спираючись на звіти агентів (наприклад: чи піддавався сайт атаці)? До переходу на Kafka Streams ми часто покладалися керувати станом на єдину центральну базу даних (+ сервісний API). Такий підхід має свої недоліки: у датаінтенсивних ситуаціях підтримка узгодженості та синхронізації перетворюється на справжній виклик. База даних може стати вузьким місцем, або опинятися в стані гонки і страждати від непередбачуваності.

Не тільки обробка: Як ми зробили з Kafka Streams розподілену базу даних, і що з цього вийшло

Ілюстрація 1: типовий сценарій з поділом стану, що зустрічався на до переходу на
Kafka та Kafka Streams: агенти повідомляють свої уявлення через API, оновлений стан розраховується через центральну базу даних

Знайомтесь з Kafka Streams - тепер стало просто створювати мікросервіси з станом, що розділяється.

Приблизно рік тому ми вирішили добре переглянути наші сценарії роботи з станом, що розділяється, щоб розібратися з такими проблемами. Відразу ж вирішили спробувати Kafka Streams – відомо, наскільки вона масштабована, високодоступна і стійка до відмови, який багатий у неї потоковий функціонал (перетворення, в тому числі, зі збереженням стану). Саме те, що нам потрібно, не кажучи вже про те, наскільки зріла та надійна система обміну повідомленнями склалася в Kafka.

Кожен із створених нами мікросервісів із збереженням стану будувався на основі інстансу Kafka Streams із досить простою топологією. Він складався з 1) джерела 2) процесора з постійним сховищем ключів та значень 3) стоку:

Не тільки обробка: Як ми зробили з Kafka Streams розподілену базу даних, і що з цього вийшло

Ілюстрація 2: топологія наших потокових інстансів, що задаються за замовчуванням, для мікросервісів із збереженням стану. Зверніть увагу: тут є сховище, в якому знаходяться метадані про планування.

При такому новому підході агенти складають повідомлення, що подаються у вихідний топік, а споживачі - скажімо, сервіс поштових повідомлень - приймають обчислений стан, що розділяється через стік (вихідний топік).

Не тільки обробка: Як ми зробили з Kafka Streams розподілену базу даних, і що з цього вийшло

Ілюстрація 3: новий приклад потоку завдань для сценарію з мікросервісами, що розділяються: 1) агент породжує повідомлення, що надходить у вихідний топік Kafka; 2) мікросервіс з станом, що розділяється (використовує Kafka Streams) обробляє його і записує обчислений стан в кінцевий топік Kafka; після чого 3) споживачі приймають новий стан

Гей, а це вбудоване сховище ключів та значень справді дуже корисно!

Як згадувалося вище, наша топологія з станом, що розділяється, містить сховище ключів і значень. Ми знайшли кілька варіантів використання, і два з них описані нижче.

Варіант #1: використання сховища ключів та значень при обчисленнях

Наше перше сховище ключів та значень містило допоміжні дані, які були потрібні нам для обчислень. Наприклад, у деяких випадках стан, що розділявся, визначався за принципом «більшості голосів». У сховищі можна було тримати останні звіти агентів про стан деякого об'єкта. Потім, отримуючи новий звіт від того чи іншого агента, ми могли зберегти його, витягти зі сховища звіти всіх інших агентів про стан того самого об'єкта і повторити обчислення.
Нижче на ілюстрації 4 показано, як ми відкривали доступ до сховища ключів і значень методу оброблення процесора, так що потім можна було обробити нове повідомлення.

Не тільки обробка: Як ми зробили з Kafka Streams розподілену базу даних, і що з цього вийшло

Ілюстрація 4: відкриваємо доступ до сховища ключів і значень для обробного методу процесора (після цього у кожному сценарії, що працює з станом, що розділяється, необхідно реалізувати метод doProcess)

Варіант #2: створення CRUD API поверх Kafka Streams

Налагодивши наш базовий потік завдань, ми почали пробувати написати RESTful CRUD API для наших мікросервісів з станом, що розділяється. Ми хотіли, щоб можна було вилучати стан деяких або всіх об'єктів, а також встановлювати або видаляти стан об'єкта (це корисно за підтримки серверної частини).

Для підтримки всіх API Get State, щоразу, коли нам потрібно було заново обчислювати стан при обробці, ми надовго вкладали його у вбудоване сховище ключів та значень. У такому випадку стає досить просто реалізувати такий API за допомогою єдиного екземпляра Kafka Streams, як показано в наведеному нижче листингу:

Не тільки обробка: Як ми зробили з Kafka Streams розподілену базу даних, і що з цього вийшло

Ілюстрація 5: використання вбудованого сховища ключів та значень для отримання передрахованого стану об'єкта

Оновлення стану об'єкта через API також легко реалізувати. В принципі, для цього потрібно тільки створити прод'юсер Kafka, а з його допомогою зробити запис, в якому міститься новий стан. Так гарантується, що всі повідомлення, згенеровані через API, будуть оброблятися так само, як і ті, що надходять від інших прод'юсерів (напр. агентів).

Не тільки обробка: Як ми зробили з Kafka Streams розподілену базу даних, і що з цього вийшло

Ілюстрація 6: встановити стан об'єкта можна за допомогою прод'юсера Kafka

Невелике ускладнення: у Kafka безліч партицій

Далі ми хотіли розподілити навантаження, пов'язане з обробкою, і покращити доступність, надавши на кожен сценарій кластер мікросервісів з станом, що розділяється. Налаштування далося нам простіше простого: після того, як ми налаштували всі інстанси так, щоб вони працювали з одним і тим самим ID програми (і з тими ж серверами початкового завантаження), практично все інше робилося автоматично. Ми також задали, що кожен вихідний топік складатиметься з кількох партицій, щоб кожному інстансу можна було привласнити підмножину таких партицій.

Також згадаю, що тут у порядку речей робити резервну копію сховища станів, щоб, наприклад, у разі відновлення після відмови переносити цю копію на іншу інстанс. На кожне сховище станів у Kafka Streams створюється топік, що реплікується, з журналом змін (в якому відстежуються локальні оновлення). Таким чином, Kafka постійно підстрахування сховища станів. Тому у разі відмови тієї чи іншої інстансу Kafka Streams сховище станів може бути швидко відновлене іншою інстансі, куди перейдуть відповідні партиції. Наш тест показали, що це робиться за лічені секунди навіть якщо в сховищі знаходяться мільйони записів.

Переходячи від одного мікросервісу з станом, що розділяється до кластера мікросервісів, стає не настільки тривіально реалізувати Get State API. У новій ситуації у сховищі станів кожного мікросервісу міститься лише частина загальної картини (об'єкти, чиї ключі відображалися на конкретну партицію). Доводилося визначати, на якому інстансі містився стан потрібного нам об'єкта, і ми робили це на основі метаданих потоків, як показано нижче:

Не тільки обробка: Як ми зробили з Kafka Streams розподілену базу даних, і що з цього вийшло

Ілюстрація 7: за допомогою метаданих потоків ми визначаємо, з якого інстансу вимагати стан необхідного об'єкта; подібний підхід застосовувався з GET ALL API

Основні висновки

Сховища станів у Kafka Streams де-факто можуть служити розподіленою базою даних,

  • постійно реплікується в Kafka
  • Поверх такої системи легко вишиковується CRUD API
  • Обробка множинних партицій виходить трохи складнішою
  • Також можна додати одне або кілька сховищ станів потокову топологію для зберігання допоміжних даних. Такий варіант може використовуватися для:
  • Довготривалого зберігання даних, необхідних для обчислень при потоковій обробці
  • Довготривале зберігання даних, які можуть бути корисними при наступній ініціалізації потокового інстансу
  • багато чого іншого…

Завдяки цим та іншим перевагам Kafka Streams чудово підходить для підтримки глобального стану у такій розподіленій системі як наша. Kafka Streams показала себе дуже надійною у продакшені (з моменту її розгортання ми практично не втрачали повідомлень), і ми впевнені, що цим її можливості не обмежуються!

Джерело: habr.com

Додати коментар або відгук