Apache Kafka та потокова обробка даних за допомогою Spark Streaming

Привіт, Хабре! Сьогодні ми побудуємо систему, яка за допомогою Spark Streaming оброблятиме потоки повідомлень Apache Kafka і записуватиме результат обробки в хмарну базу даних AWS RDS.

Уявімо, що деяка кредитна організація ставить перед нами завдання обробки транзакцій, що входять, «на льоту» по всіх своїх філіях. Це може бути зроблено з метою оперативного розрахунку відкритої валютою позиції для казначейства, лімітів чи фінансового результату з операцій тощо.

Як реалізувати цей кейс без застосування магії та чарівних заклинань – читаємо під катом! Поїхали!

Apache Kafka та потокова обробка даних за допомогою Spark Streaming
(Джерело картинки)

Запровадження

Безперечно, обробка великого масиву даних у реальному часі надає широкі можливості для використання в сучасних системах. Однією з найпопулярніших комбінацій для цього є тандем Apache Kafka та Spark Streaming, де Kafka створює потік пакетів вхідних повідомлень, а Spark Streaming обробляє ці пакети через заданий інтервал часу.

Для підвищення відмовостійкості програми будемо використовувати контрольні точки - чекпоїнти (checkpoints). За допомогою цього механізму, коли модулю Spark Streaming потрібно буде відновити втрачені дані, йому потрібно буде лише повернутися до останньої контрольної точки та відновити обчислення від неї.

Архітектура системи, що розробляється

Apache Kafka та потокова обробка даних за допомогою Spark Streaming

Використовувані компоненти:

  • Апач Кафка - Це розподілена система обміну повідомленнями з публікацією та підпискою. Підходить як для автономного, так і онлайнового споживання повідомлень. Для запобігання втраті даних повідомлення Kafka зберігаються на диску та реплікуються усередині кластера. Система Kafka побудована поверх служби синхронізації ZooKeeper;
  • Apache Spark Streaming компонент Spark для обробки потокових даних. Модуль Spark Streaming побудований із застосуванням «мікропакетної» архітектури (micro-batch architecture), коли потік даних інтерпретується як безперервна послідовність невеликих пакетів даних. Spark Streaming приймає дані з різних джерел та об'єднує їх у невеликі пакети. Нові пакети створюються через регулярні проміжки часу. На початку кожного інтервалу часу створюється новий пакет, і будь-які дані, що надійшли протягом інтервалу, включаються в пакет. Наприкінці інтервалу збільшення пакета припиняється. Розмір інтервалу визначається параметром, що називається інтервал пакетування (batch interval);
  • Apache Spark SQL - поєднує реляційну обробку з функціональним програмуванням Spark. Під структурованими даними маються на увазі дані, мають схему, тобто єдиний набір полів всім записів. Spark SQL підтримує введення з безлічі джерел структурованих даних і завдяки наявності інформації про схему, він може ефективно витягувати тільки необхідні поля записів, а також надає API-інтерфейси DataFrame;
  • AWS RDS — це порівняно недорога хмарна реляційна база даних, веб-сервіс, який спрощує налаштування, експлуатацію та масштабування, адмініструється безпосередньо Amazon.

Встановлення та запуск сервера Kafka

Перед безпосереднім використанням Kafka необхідно переконатися в наявності Java, т.к. для роботи використовується JVM:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Створимо нового користувача для роботи з Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Далі завантажуємо дистрибутив з офіційного сайту Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Розпаковуємо завантажений архів:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Наступний крок – опціональний. Справа в тому, що параметри за замовчуванням не дозволяють повноцінно використовувати всі можливості Apache Kafka. Наприклад, видаляти тему, категорію, групу, на які можуть бути опубліковані повідомлення. Щоб змінити це, відредагуємо конфігураційний файл:

vim ~/kafka/config/server.properties

Додайте до кінця файлу наступне:

delete.topic.enable = true

Перед запуском сервера Kafka необхідно стартувати сервер ZooKeeper, будемо використовувати допоміжний скрипт, який поставляється разом з дистрибутивом Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Після того, як ZooKeeper успішно стартував, в окремому терміналі запускаємо сервер Kafka:

bin/kafka-server-start.sh config/server.properties

Створимо новий топік під назвою Transaction:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Переконаємося, що топік з необхідною кількістю партицій та реплікацією було створено:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka та потокова обробка даних за допомогою Spark Streaming

Упустимо моменти тестування продюсера та консьюмера для новоствореного топіка. Докладніше про те, як можна протестувати відправлення та прийом повідомлень, написано в офіційній документації. Send some messages. Ну, а ми переходимо до написання продюсера на Python з використанням KafkaProducer API.

Написання продюсера

Продюсер генеруватиме випадкові дані — по 100 повідомлень кожну секунду. Під випадковими даними розумітимемо словник, що складається з трьох полів:

  • Філія - Найменування точки продажів кредитної організації;
  • Валюта - Валюта угоди;
  • сума - Сума угоди. Сума буде позитивною кількістю, якщо це купівля валюти Банком, і негативною — якщо продаж.

Код для продюсера виглядає так:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Далі, використовуючи метод send, відправляємо повідомлення на сервер, у потрібний нам топік, у форматі JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

При запуску скрипту отримуємо у терміналі такі повідомлення:

Apache Kafka та потокова обробка даних за допомогою Spark Streaming

Це означає, що все працює, як ми хотіли, — продюсер генерує і відправляє повідомлення в потрібний нам топік.
Наступним кроком буде встановлення Spark та обробка цього потоку повідомлень.

Встановлення Apache Spark

Apache Spark — це універсальна та високопродуктивна кластерна обчислювальна платформа.

За продуктивністю Spark перевершує популярні реалізації моделі MapReduce, попутно забезпечуючи підтримку ширшого діапазону типів обчислень, включаючи інтерактивні запити та потокову обробку. Швидкість відіграє важливу роль при обробці великих обсягів даних, тому що саме швидкість дозволяє працювати в інтерактивному режимі, не витрачаючи хвилин або годин на очікування. Одна з найважливіших переваг Spark, що забезпечують таку високу швидкість, - здатність виконувати обчислення в пам'яті.

Цей фреймворк написаний на Scala, тому необхідно встановити її насамперед:

sudo apt-get install scala

Завантажуємо з офіційного сайту дистрибутив Spark:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Розпаковуємо архів:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Додаємо шлях до Spark у bash-файл:

vim ~/.bashrc

Додаємо через редактор такі рядки:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Виконуємо команду нижче після внесення правок bashrc:

source ~/.bashrc

Розгортання AWS PostgreSQL

Залишилося розгорнути базу даних, куди заливатимемо оброблену інформацію з потоків. Для цього використовуватимемо сервіс AWS RDS.

Заходимо в консоль AWS -> AWS RDS -> Databases -> Create database:
Apache Kafka та потокова обробка даних за допомогою Spark Streaming

Вибираємо PostgreSQL і натискаємо кнопку Next:
Apache Kafka та потокова обробка даних за допомогою Spark Streaming

Т.к. даний приклад розбирається виключно з освітньою метою, будемо використовувати безкоштовний сервер «на мінімалках» (Free Tier):
Apache Kafka та потокова обробка даних за допомогою Spark Streaming

Далі, ставимо галочку в блоці Free Tier, і після цього нам автоматично буде запропоновано інстанс класу t2.micro - хоч і слабенький, але безкоштовний і цілком підійде для нашого завдання:
Apache Kafka та потокова обробка даних за допомогою Spark Streaming

Далі йдуть дуже важливі речі: найменування інстансу БД, ім'я майстер-користувача та його пароль. Назвемо інстанст: myHabrTest, майстер-користувач: хабр, пароль: habr12345 та натискаємо на кнопку Next:
Apache Kafka та потокова обробка даних за допомогою Spark Streaming

На наступній сторінці знаходяться параметри, що відповідають за доступність нашого сервера БД ззовні (Public accessibility) та доступність портів:

Apache Kafka та потокова обробка даних за допомогою Spark Streaming

Давайте створимо нове налаштування для VPC security group, яка дозволить ззовні звертатися до нашого сервера БД через порт 5432 (PostgreSQL).
Перейдемо в окремому вікні браузера до консолі AWS у розділі VPC Dashboard -> Security Groups -> Create security group:
Apache Kafka та потокова обробка даних за допомогою Spark Streaming

Задаємо ім'я для Security group - PostgreSQL, опис, вказуємо до якої VPC ця група має бути асоційована і натискаємо кнопку Create:
Apache Kafka та потокова обробка даних за допомогою Spark Streaming

Заповнюємо для новоствореної групи Inbound rules для порту 5432, як показано на малюнку нижче. Вручну порт можна не вказувати, а вибрати PostgreSQL з списку Type, що розкривається.

Строго кажучи, значення ::/0 означає доступність вхідного трафіку для сервера з усього світу, що канонічно не зовсім вірно, але для аналізу прикладу дозволимо собі використовувати такий підхід:
Apache Kafka та потокова обробка даних за допомогою Spark Streaming

Повертаємося до сторінки браузера, де у нас відкрито "Configure advanced settings" і вибираємо у розділі VPC security groups -> Choose existing VPC security groups -> PostgreSQL:
Apache Kafka та потокова обробка даних за допомогою Spark Streaming

Далі, у розділі Database options -> Database name -> задаємо ім'я - habrDB.

Інші параметри, за винятком хіба що відключення бекапа (backup retention period — 0 days), моніторингу та Performance Insights, можемо залишити за замовчуванням. Натискаємо на кнопку Create database:
Apache Kafka та потокова обробка даних за допомогою Spark Streaming

Обробник потоків

Завершальним етапом буде розробка Spark-джоби, яка кожні дві секунди оброблятиме нові дані, що прийшли від Kafka і заноситимуть результат до бази даних.

Як було зазначено вище, контрольні точки (сheckpoints) - це основний механізм SparkStreaming, який повинен бути налаштований для забезпечення відмовостійкості. Будемо використовувати контрольні точки і, у разі падіння процедури, модулю Spark Streaming для відновлення втрачених даних потрібно буде лише повернутися до останньої контрольної точки та відновити обчислення від неї.

Контрольну точку можна увімкнути, встановивши каталог у стійкій, надійної файлової системі (наприклад, HDFS, S3 і т. д.), в якій буде збережена інформація контрольної точки. Це робиться за допомогою, наприклад:

streamingContext.checkpoint(checkpointDirectory)

У нашому прикладі будемо використовувати наступний підхід, а саме, якщо checkpointDirectory існує, то контекст буде відтворено з даних контрольної точки. Якщо каталог не існує (тобто виконується вперше), то викликається функція functionToCreateContext для створення нового контексту та налаштування DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Створюємо об'єкт DirectStream з метою підключення до топіка "transaction" за допомогою методу createDirectStream бібліотеки KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Парсим вхідні дані у форматі JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Використовуючи Spark SQL, робимо нескладне угруповання та виводимо результат у консоль:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Отримання тексту запиту та запуск його через Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

А потім зберігаємо отримані агреговані дані в таблицю AWS RDS. Щоб зберегти результати агрегації в таблицю бази даних, будемо використовувати метод write об'єкта DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Декілька слів про налаштування підключення до AWS RDS. Користувача та пароль до нього ми створювали на етапі «Розгортання AWS PostgreSQL». Як url ​​сервер баз даних слід використовувати Endpoint, який відображається в розділі Connectivity & security:

Apache Kafka та потокова обробка даних за допомогою Spark Streaming

З метою коректної зв'язки Spark і Kafka слід запускати джобу через smark-submit з використанням артефакту spark-streaming-kafka-0-8_2.11. Додатково застосуємо також артефакт для взаємодії з базою даних PostgreSQL, їх передаватимемо через —packages.

Для гнучкості скрипта, винесемо як вхідні параметри також найменування сервера повідомлень і топік, з якого хочемо отримувати дані.

Отже, настав час запустити та перевірити працездатність системи:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Все вийшло! Як видно на малюнку нижче - в процесі роботи програми нові результати агрегації виводяться кожні 2 секунди, тому що ми встановили інтервал пакетування рівним 2 секунд, коли створювали об'єкт StreamingContext:

Apache Kafka та потокова обробка даних за допомогою Spark Streaming

Далі, робимо нехитрий запит до бази даних, щоб перевірити наявність записів у таблиці transaction_flow:

Apache Kafka та потокова обробка даних за допомогою Spark Streaming

Висновок

У цій статті було розглянуто приклад потокової обробки інформації з використанням Spark Streaming у зв'язці з Apache Kafka та PostgreSQL. Зі зростанням обсягів даних із різних джерел, складно переоцінити практичну цінність Spark Streaming для створення потокових додатків та додатків, що діють у масштабі реального часу.

Повний вихідний код ви можете знайти в моєму репозиторії на GitHub.

Із задоволенням готовий обговорити цю статтю, чекаю на Ваші коментарі, а також сподіваюся на конструктивну критику всіх небайдужих читачів.

Бажаю успіхів!

Ps. Спочатку планувалося використовувати локальну БД PostgreSQL, але з огляду на мою любов до AWS я вирішив винести базу даних у хмару. У наступній статті на цю тему я покажу, як реалізувати цілком описану систему в AWS за допомогою AWS Kinesis та AWS EMR. Слідкуйте за новинами!

Джерело: habr.com

Додати коментар або відгук