Apache Kafka и поточно предаване на данни с Spark Streaming
Хей Хабр! Днес ще изградим система, която ще обработва потоци от съобщения на Apache Kafka с помощта на Spark Streaming и ще запише резултата от обработката в облачната база данни на AWS RDS.
Нека си представим, че определена кредитна институция поставя пред нас задачата да обработва входящи транзакции "в движение" за всички свои клонове. Това може да се направи с цел бързо изчисляване на отворената валутна позиция за хазната, лимити или финансов резултат по транзакции и др.
Как да приложите този случай без използването на магия и магически заклинания - прочетете под разреза! Отивам!
Разбира се, обработката в реално време на голямо количество данни предоставя широки възможности за използване в съвременните системи. Една от най-популярните комбинации за това е тандемът на Apache Kafka и Spark Streaming, където Kafka създава поток от входящи пакети съобщения, а Spark Streaming обработва тези пакети на определен интервал от време.
За да подобрим устойчивостта на грешки на приложението, ще използваме контролни точки - контролни точки. С този механизъм, когато модулът Spark Streaming трябва да възстанови изгубени данни, той трябва само да се върне към последната контролна точка и да възобнови изчисленията оттам.
Архитектурата на разработената система
Използвани компоненти:
Апачи Кафка е разпределена система за публикуване и абониране на съобщения. Подходящ както за офлайн, така и за онлайн консумация на съобщения. За да се предотврати загуба на данни, Kafka съобщенията се съхраняват на диска и се репликират в рамките на клъстера. Системата Kafka е изградена върху услугата за синхронизация ZooKeeper;
Apache Spark Streaming - Spark компонент за обработка на поточни данни. Модулът Spark Streaming е изграден с помощта на микропакетна архитектура, когато поток от данни се интерпретира като непрекъсната последователност от малки пакети данни. Spark Streaming взема данни от различни източници и ги комбинира в малки партиди. Нови пакети се създават на редовни интервали. В началото на всеки интервал от време се създава нов пакет и всички данни, получени през този интервал, се включват в пакета. В края на интервала растежът на пакета спира. Размерът на интервала се определя от параметър, наречен пакетен интервал;
Apache Spark SQL - Комбинира релационна обработка с функционално програмиране на Spark. Структурираните данни се отнасят до данни, които имат схема, тоест единичен набор от полета за всички записи. Spark SQL поддържа въвеждане от различни източници на структурирани данни и, поради наличието на информация за схемата, може ефективно да извлича само необходимите полета от записи, а също така предоставя DataFrame API;
AWS RDS е сравнително евтина релационна база данни, базирана на облак, уеб услуга, която опростява настройката, работата и мащабирането, администрирана директно от Amazon.
Инсталиране и стартиране на Kafka сървър
Преди да използвате директно Kafka, трябва да се уверите, че имате Java, т.к JVM се използва за работа:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Следващата стъпка не е задължителна. Факт е, че настройките по подразбиране не ви позволяват да използвате напълно всички функции на Apache Kafka. Например изтриване на тема, категория, група, в която могат да се публикуват съобщения. За да промените това, нека редактираме конфигурационния файл:
vim ~/kafka/config/server.properties
Добавете следното в края на файла:
delete.topic.enable = true
Преди да стартирате сървъра Kafka, трябва да стартирате сървъра ZooKeeper, ние ще използваме помощния скрипт, който идва с дистрибуцията на Kafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
След като ZooKeeper стартира успешно, стартираме Kafka сървъра в отделен терминал:
Нека пропуснем моментите на тестване на производителя и потребителя за новосъздадената тема. Повече подробности за това как можете да тествате изпращането и получаването на съобщения са написани в официалната документация - Изпратете няколко съобщения. Е, преминаваме към писане на продуцент в Python, използвайки KafkaProducer API.
Продуцентско писане
Производителят ще генерира произволни данни - 100 съобщения всяка секунда. Под произволни данни имаме предвид речник, състоящ се от три полета:
Клон — име на пункта за продажба на кредитната институция;
Валута — валута на сделката;
Количество - сума на транзакцията. Сумата ще бъде положителна, ако е покупка на валута от Банката, и отрицателна, ако е продажба.
След това, използвайки метода за изпращане, изпращаме съобщение до сървъра, до темата, от която се нуждаем, във формат JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Когато изпълняваме скрипта, получаваме следните съобщения в терминала:
Това означава, че всичко работи както искахме - производителят генерира и изпраща съобщения до темата, от която се нуждаем.
Следващата стъпка е да инсталирате Spark и да обработите този поток от съобщения.
Инсталиране на Apache Spark
Апачи Спарк е многофункционална и високопроизводителна клъстерна изчислителна платформа.
Spark превъзхожда популярните реализации на модела MapReduce по отношение на производителността, като същевременно осигурява поддръжка за по-широк набор от типове изчисления, включително интерактивни заявки и стрийминг. Скоростта играе важна роля при обработката на големи количества данни, тъй като именно скоростта ви позволява да работите интерактивно, без да прекарвате минути или часове в чакане. Една от най-силните страни на Spark за предоставяне на тази скорост е способността му да извършва изчисления в паметта.
Тази рамка е написана на Scala, така че първо трябва да я инсталирате:
sudo apt-get install scala
Изтеглете дистрибуцията на Spark от официалния уебсайт:
Изпълнете командата по-долу, след като направите промени в bashrc:
source ~/.bashrc
Внедряване на AWS PostgreSQL
Остава да разширим базата данни, където ще попълваме обработената информация от потоците. За целта ще използваме услугата AWS RDS.
Отидете на AWS конзолата -> AWS RDS -> Бази данни -> Създаване на база данни:
Изберете PostgreSQL и щракнете върху бутона Напред:
защото този пример е анализиран единствено за образователни цели, ние ще използваме безплатен сървър „като минимум“ (Free Tier):
След това проверете блока Free Tier и след това автоматично ще ни бъде предложен екземпляр от класа t2.micro - макар и слаб, но безплатен и доста подходящ за нашата задача:
Следват много важни неща: името на инстанцията на DB, името на главния потребител и неговата парола. Нека извикаме екземпляра: myHabrTest, главен потребител: хабр, парола: habr12345 и щракнете върху бутона Напред:
Следващата страница съдържа параметрите, отговорни за достъпността на нашия сървър на база данни отвън (публична достъпност) и наличността на портове:
Нека създадем нова настройка за групата за сигурност на VPC, която ще позволи външен достъп до нашия сървър на база данни през порт 5432 (PostgreSQL).
Нека отидем в отделен прозорец на браузъра към конзолата на AWS в таблото за управление на VPC -> Групи за сигурност -> раздел Създаване на група за сигурност:
Задаваме името на групата за сигурност - PostgreSQL, описание, посочваме с кой VPC трябва да бъде свързана тази група и щракнете върху бутона Създаване:
Попълваме новосъздадената група Inbound rules за порт 5432, както е показано на снимката по-долу. Не можете да посочите порта ръчно, но изберете PostgreSQL от падащия списък Тип.
Строго погледнато, стойността ::/0 означава наличието на входящ трафик за сървъра от цял свят, което не е съвсем вярно канонично, но за да анализираме примера, нека използваме този подход:
Връщаме се на страницата на браузъра, където имаме отворено „Конфигуриране на разширени настройки“ и избираме VPC групи за сигурност -> Изберете съществуващи VPC групи за сигурност -> PostgreSQL в секцията:
След това в секцията Опции за база данни -> Име на база данни -> задайте името - habrDB.
Можем да оставим останалите параметри, с изключение на деактивирането на архивиране (период на съхранение на архива - 0 дни), мониторинг и Performance Insights, по подразбиране. Кликнете върху бутона Създайте база данни:
Обработчик на поток
Последният етап ще бъде разработването на задача на Spark, която ще обработва нови данни от Kafka на всеки две секунди и ще въвежда резултата в базата данни.
Както беше отбелязано по-горе, контролните точки са основният механизъм в SparkStreaming, който трябва да бъде конфигуриран, за да осигури толерантност към грешки. Ще използваме контролни точки и в случай на повреда на процедурата, модулът Spark Streaming ще трябва само да се върне към последната контролна точка и да възобнови изчисленията от нея, за да възстанови изгубените данни.
Контролна точка може да бъде активирана чрез задаване на директория на устойчива на грешки, надеждна файлова система (напр. HDFS, S3 и т.н.), където ще се съхранява информацията за контролната точка. Това става например с:
streamingContext.checkpoint(checkpointDirectory)
В нашия пример ще използваме следния подход, а именно, ако checkpointDirectory съществува, тогава контекстът ще бъде пресъздаден от данните за контролната точка. Ако директорията не съществува (т.е. тя се изпълнява за първи път), тогава функцията functionToCreateContext се извиква, за да създаде нов контекст и да настрои DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Създаваме DirectStream обект, за да се свържем с темата „транзакция“, като използваме метода createDirectStream на библиотеката KafkaUtils:
Използвайки Spark SQL, ние правим просто групиране и извеждаме резултата на конзолата:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Получаване на тялото на заявката и изпълнението й през Spark SQL:
И след това запазваме получените обобщени данни в таблица в AWS RDS. За да запазим резултатите от агрегирането в таблица на база данни, ще използваме метода за запис на обекта DataFrame:
Няколко думи за настройка на връзка към AWS RDS. Създадохме потребителя и паролата за него в стъпката „Внедряване на AWS PostgreSQL“. Като url на сървъра на базата данни трябва да използвате крайната точка, която се показва в раздела Свързване и сигурност:
За да свържете правилно Spark и Kafka, трябва да стартирате заданието чрез smark-submit, като използвате артефакта spark-streaming-kafka-0-8_2.11. Освен това ще използваме и артефакт за взаимодействие с базата данни на PostgreSQL, ще ги предаваме през --packages.
За гъвкавост на скрипта ще извадим името на сървъра за съобщения и темата, от която искаме да получаваме данни, като входни параметри.
И така, време е да стартирате и тествате системата:
Всичко се получи! Както можете да видите на снимката по-долу, докато приложението работи, нови резултати от агрегацията се показват на всеки 2 секунди, тъй като зададохме интервала на групиране на 2 секунди, когато създадохме обекта StreamingContext:
След това правим проста заявка към базата данни, за да проверим за записи в таблицата транзакционен_поток:
Заключение
В тази статия беше разгледан пример за обработка на поточно предаване на информация с помощта на Spark Streaming във връзка с Apache Kafka и PostgreSQL. С нарастването на обемите данни от различни източници е трудно да се надцени практическата стойност на Spark Streaming за създаване на приложения в реално време и стрийминг.
Можете да намерите пълния изходен код в моето хранилище на GitHub.
Щастлив съм да обсъдя тази статия, очаквам вашите коментари и също така се надявам на градивна критика от всички заинтересовани читатели.
Желая ви успех!
Пс. Първоначално беше планирано да се използва локална база данни PostgreSQL, но предвид любовта ми към AWS реших да преместя базата данни в облака. В следващата статия по тази тема ще ви покажа как да внедрите цялата описана по-горе система в AWS с помощта на AWS Kinesis и AWS EMR. Следете новините!