Apache Kafka і струменевая апрацоўка даных з дапамогай Spark Streaming
Прывітанне, Хабр! Сёння мы пабудуем сістэму, якая будзе пры дапамозе Spark Streaming апрацоўваць плыні паведамленняў Apache Kafka і запісваць вынік апрацоўкі ў хмарную базу дадзеных AWS RDS.
Уявім, што нейкая крэдытная арганізацыя ставіць перад намі задачу апрацоўкі ўваходзяць транзакцый "на ляту" па ўсіх сваіх філіялах. Гэта можа быць зроблена з мэтай аператыўнага разліку адкрытай валютай пазіцыі для казначэйства, лімітаў або фінансавага выніку па здзелках і г.д.
Як рэалізаваць гэты кейс без прымянення магіі і чароўных загавораў - чытаем пад катом! Паехалі!
Безумоўна, апрацоўка вялікага масіва дадзеных у рэальным часе дае шырокія магчымасці для выкарыстання ў сучасных сістэмах. Адной з найпапулярных камбінацый для гэтага з'яўляецца тандэм Apache Kafka і Spark Streaming, дзе Kafka - стварае струмень пакетаў уваходных паведамленняў, а Spark Streaming апрацоўвае гэтыя пакеты праз зададзены інтэрвал часу.
Для павышэння адмоваўстойлівасці прыкладання, будзем выкарыстоўваць кантрольныя кропкі - чэкпоінты (checkpoints). Пры дапамозе гэтага механізму, калі модулю Spark Streaming запатрабуецца аднавіць згубленыя дадзеныя, яму трэба будзе толькі вярнуцца да апошняй кантрольнай кропкі і аднавіць вылічэнні ад яе.
Архітэктура распрацоўванай сістэмы
Выкарыстоўваныя кампаненты:
Apache Kafka - Гэта размеркаваная сістэма абмену паведамленнямі з публікацыяй і падпіскай. Падыходзіць як для аўтаномнага, так і для анлайнавага спажывання паведамленняў. Для прадухілення страты дадзеных паведамленні Kafka захоўваюцца на дыску і рэпліцыруюцца ўнутры кластара. Сістэма Kafka пабудавана па-над службай сінхранізацыі ZooKeeper;
Apache Spark Streaming - кампанент Spark для апрацоўкі струменевых дадзеных. Модуль Spark Streaming пабудаваны з ужываннем «мікрапакетнай» архітэктуры (micro-batch architecture), калі струмень дадзеных інтэрпрэтуецца як бесперапынная паслядоўнасць маленькіх пакетаў дадзеных. Spark Streaming прымае дадзеныя з розных крыніц і аб'ядноўвае іх у невялікія пакеты. Новыя пакеты ствараюцца праз рэгулярныя інтэрвалы часу. У пачатку кожнага інтэрвалу часу ствараецца новы пакет, і любыя дадзеныя, якія паступілі на працягу гэтага інтэрвалу, уключаюцца ў пакет. У канцы інтэрвалу павелічэнне пакета спыняецца. Памер інтэрвалу вызначаецца параметрам, які завецца інтэрвал пакетавання (batch interval);
Apache Spark SQL - аб'ядноўвае рэляцыйную апрацоўку з функцыянальным праграмаваннем Spark. Пад структураванымі дадзенымі маюцца на ўвазе дадзеныя, якія маюць схему, гэта значыць адзіны набор палёў для ўсіх запісаў. Spark SQL падтрымлівае ўвод з мноства крыніц структураваных дадзеных і, дзякуючы наяўнасці інфармацыі аб схеме, ён можа эфектыўна здабываць толькі неабходныя палі запісаў, а таксама падае API-інтэрфейсы DataFrame;
AWS RDS – гэта параўнальна недарагая хмарная рэляцыйная база дадзеных, вэб-сэрвіс, які спрашчае наладу, эксплуатацыю і маштабаванне, адмініструецца непасрэдна Amazon.
Усталяванне і запуск сервера Kafka
Перад непасрэдным выкарыстаннем Kafka, неабходна пераканацца ў наяўнасці Java, т.я. для працы выкарыстоўваецца JVM:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Наступны крок - апцыянальны. Справа ў тым, што налады па змаўчанні не дазваляюць паўнавартасна выкарыстоўваць усе магчымасці Apache Kafka. Напрыклад, выдаляць тэму, катэгорыю, групу, на якія могуць быць апублікаваны паведамленні. Каб змяніць гэта, адрэдагуем файл канфігурацыі:
vim ~/kafka/config/server.properties
Дадайце ў канец файла наступнае:
delete.topic.enable = true
Перад запускам сервера Kafka, неабходна стартаваць сервер ZooKeeper, будзем выкарыстоўваць дапаможны скрыпт, які пастаўляецца разам з дыстрыбутывам Kafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Пасля таго, як ZooKeeper паспяхова стартаваў, у асобным тэрмінале запускаем сервер Kafka:
Выпусцім моманты тэставання прадзюсара і кансьюмера для ізноў створанага топіка. Больш падрабязна пра тое, як можна пратэставаць адпраўку і прыём паведамленняў, напісана ў афіцыйнай дакументацыі. Send some messages. Ну а мы пераходзім да напісання прадзюсара на Python з выкарыстаннем KafkaProducer API.
Напісанне прадзюсара
Прадзюсар будзе генераваць выпадковыя дадзеныя - па 100 паведамленняў кожную секунду. Пад выпадковымі дадзенымі будзем разумець слоўнік, які складаецца з трох палёў:
Філіял - найменне пункту продажаў крэдытнай арганізацыі;
Валюта - Валюта здзелкі;
сума - Сума здзелкі. Сума будзе станоўчым лікам, калі гэта купля валюты Банкам, і адмоўным - калі продаж.
Далей, выкарыстоўваючы метад send, адпраўляем паведамленне на сервер, у патрэбны нам топік, у фармаце JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Пры запуску скрыпту атрымліваем у тэрмінале наступныя паведамленні:
Гэта азначае, што ўсё працуе як мы хацелі - прадзюсар генерыт і адпраўляе паведамленні ў патрэбны нам топік.
Наступным крокам будзе ўстаноўка Spark і апрацоўка гэтага патоку паведамленняў.
Ўстаноўка Apache Spark
Apache Spark - гэта ўніверсальная і высокапрадукцыйная кластарная вылічальная платформа.
Па прадукцыйнасці Spark пераўзыходзіць папулярныя рэалізацыі мадэлі MapReduce, адначасна забяспечваючы падтрымку шырэйшага дыяпазону тыпаў вылічэнняў, уключаючы інтэрактыўныя запыты і струменевую апрацоўку. Хуткасць гуляе важную ролю пры апрацоўцы вялікіх аб'ёмаў дадзеных, бо менавіта хуткасць дазваляе працаваць у інтэрактыўным рэжыме, не марнуючы хвіліны ці гадзіны на чаканне. Адно з найважных добрых якасцяў Spark, якія забяспечваюць гэтак высокую хуткасць, здольнасць выконваць вылічэнні ў памяці.
Дадзены фрэймворк напісаны на Scala, таму неабходна ўсталяваць яе ў першую чаргу:
Т.к. дадзены прыклад разбіраецца выключна ў адукацыйных мэтах, будзем выкарыстоўваць бясплатны сервер "на мінімалках" (Free Tier):
Далей, ставім галачку ў блоку Free Tier, і пасля гэтага нам аўтаматычна будзе прапанаваны інстанс класа t2.micro – хоць і слабенькі, але бясплатны і цалкам падыдзе для нашай задачы:
Следам ідуць вельмі важныя рэчы: найменне інстанса БД, імя майстар-карыстальніка і яго пароль. Назавем інстанст: myHabrTest, майстар-карыстальнік: хабр, пароль: хабр12345 і націскаем на кнопку Next:
На наступнай старонцы знаходзяцца параметры, якія адказваюць за даступнасць нашага сервера БД звонку (Public accessibility) і даступнасць партоў:
Давайце створым новую наладу для VPC security group, якая дазволіць звонку звяртацца да нашага сервера БД праз порт 5432 (PostgreSQL).
Пяройдзем у асобным акне браўзэра да кансолі AWS у падзел VPC Dashboard -> Security Groups -> Create security group:
Задаем імя для Security group - PostgreSQL, апісанне, паказваем да якой VPC дадзеная група павінна быць асацыяваная і націскаем кнопку Create:
Запаўняем для свежастворанай групы Inbound rules для порта 5432, як паказана на малюнку ніжэй. Уручную порт можна не паказваць, а абраць PostgreSQL з які расчыняецца спісу Type.
Строга кажучы, значэнне ::/0 азначае даступнасць уваходнага трафіку для сервера з усяго свету, што кананічна не зусім дакладна, але для разбору прыкладу дазволім сабе выкарыстоўваць такі падыход:
Вяртаемся да старонкі браўзэра, дзе ў нас адчынена "Configure advanced settings" і выбіраемы ў падзеле VPC security groups -> Choose existing VPC security groups -> PostgreSQL:
Далей, у падзеле Database options -> Database name -> задаем імя - habrDB.
Астатнія параметры, за выключэннем хіба што адключэння бэкапіравання (backup retention period – 0 days), маніторынгу і Performance Insights, можам пакінуць па змаўчанні. Націскаем на кнопку Стварыць базу дадзеных:
Апрацоўшчык патокаў
Завяршальным этапам будзе распрацоўка Spark-джобы, якая будзе кожныя дзве секунды апрацоўваць новыя дадзеныя, якія прыйшлі ад Kafka і ўносіць вынік у базу дадзеных.
Як было адзначана вышэй, кантрольныя кропкі (сheckpoints) - гэта асноўны механізм у SparkStreaming, які павінен быць настроены для забеспячэння адмоваўстойлівасці. Будзем выкарыстоўваць кантрольныя кропкі і, у выпадку падзення працэдуры, модулю Spark Streaming для аднаўлення страчаных дадзеных трэба будзе толькі вярнуцца да апошняй кантрольнай кропкі і аднавіць вылічэнні ад яе.
Кантрольную кропку можна ўключыць, усталяваўшы каталог у адмоваўстойлівай, надзейнай файлавай сістэме (напрыклад, HDFS, S3 і т. Д.), у якой будзе захавана інфармацыя кантрольнай кропкі. Гэта робіцца з дапамогай, напрыклад:
streamingContext.checkpoint(checkpointDirectory)
У нашым прыкладзе будзем выкарыстоўваць наступны падыход, а менавіта, калі checkpointDirectory існуе, то кантэкст будзе ўзноўлены з дадзеных кантрольнай кропкі. Калі каталог не існуе (г.зн. выконваецца ў першы раз), тое выклікаецца функцыя functionToCreateContext для стварэння новага кантэксту і налады DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Ствараем аб'ект DirectStream з мэтай падлучэння да топіка "transaction" пры дапамозе метаду createDirectStream бібліятэкі KafkaUtils:
Выкарыстоўваючы Spark SQL, які робіцца нескладаную групоўку і выводны вынік у кансоль:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Атрыманне тэксту запыту і запуск яго праз Spark SQL:
А затым захоўваем атрыманыя агрэгаваныя дадзеныя ў табліцу ў AWS RDS. Каб захаваць вынікі агрэгацыі ў табліцу базы дадзеных, будзем выкарыстоўваць метад write аб'екта DataFrame:
Некалькі слоў аб наладзе падлучэння да AWS RDS. Карыстальніка і пароль да яго мы стваралі на кроку "Разгортванне AWS PostgreSQL". У якасці url сервера баз дадзеных варта выкарыстоўваць Endpoint, які адлюстроўваецца ў падзеле Connectivity & security:
У мэтах карэктнай звязкі Spark і Kafka, варта запускаць джобу праз smark-submit з выкарыстаннем артэфакта spark-streaming-kafka-0-8_2.11. Дадаткова ўжыем таксама артэфакт для ўзаемадзеяння з базай дадзеных PostgreSQL, іх будзем перадаваць праз -packages.
Для гнуткасці скрыпту, вынесем у якасці ўваходных параметраў таксама найменне сервера паведамленняў і топік, з якога жадаем атрымліваць дадзеныя.
Такім чынам, прыйшоў час запусціць і праверыць працаздольнасць сістэмы:
Усё атрымалася! Як відаць на малюнку ніжэй - падчас працы прыкладання новыя вынікі агрэгацыі выводзяцца кожныя 2 секунды, таму што мы ўсталявалі інтэрвал пакетавання роўным 2 секундам, калі стваралі аб'ект StreamingContext:
Далей, які робіцца няхітры запыт да базы дадзеных, каб праверыць наяўнасць запісаў у табліцы transaction_flow:
Заключэнне
У дадзеным артыкуле быў разгледжаны прыклад струменевай апрацоўкі інфармацыі з выкарыстаннем Spark Streaming у звязку з Apache Kafka і PostgreSQL. З ростам аб'ёмаў дадзеных з розных крыніц, складана пераацаніць практычную каштоўнасць Spark Streaming для стварэння струменевых прыкладанняў і прыкладанняў, якія дзейнічаюць у маштабе рэальнага часу.
Поўны зыходны код вы можаце знайсці ў маім рэпазітары на GitHub.
З задавальненнем гатовы абмеркаваць дадзены артыкул, чакаю Вашых каментароў, а таксама, спадзяюся на канструктыўную крытыку ўсіх неабыякавых чытачоў.
Жадаю поспехаў!
Ps. Першапачаткова планавалася выкарыстоўваць лакальную БД PostgreSQL, але улічваючы маё каханне да AWS, я вырашыў вынесці базу дадзеных у воблака. У наступным артыкуле па гэтай тэме я пакажу, як рэалізаваць цалкам апісаную сістэму ў AWS пры дапамозе AWS Kinesis і AWS EMR. Сачыце за навінамі!