Apache Kafka і струменевая апрацоўка даных з дапамогай Spark Streaming

Прывітанне, Хабр! Сёння мы пабудуем сістэму, якая будзе пры дапамозе Spark Streaming апрацоўваць плыні паведамленняў Apache Kafka і запісваць вынік апрацоўкі ў хмарную базу дадзеных AWS RDS.

Уявім, што нейкая крэдытная арганізацыя ставіць перад намі задачу апрацоўкі ўваходзяць транзакцый "на ляту" па ўсіх сваіх філіялах. Гэта можа быць зроблена з мэтай аператыўнага разліку адкрытай валютай пазіцыі для казначэйства, лімітаў або фінансавага выніку па здзелках і г.д.

Як рэалізаваць гэты кейс без прымянення магіі і чароўных загавораў - чытаем пад катом! Паехалі!

Apache Kafka і струменевая апрацоўка даных з дапамогай Spark Streaming
(Крыніца карцінкі)

Увядзенне

Безумоўна, апрацоўка вялікага масіва дадзеных у рэальным часе дае шырокія магчымасці для выкарыстання ў сучасных сістэмах. Адной з найпапулярных камбінацый для гэтага з'яўляецца тандэм Apache Kafka і Spark Streaming, дзе Kafka - стварае струмень пакетаў уваходных паведамленняў, а Spark Streaming апрацоўвае гэтыя пакеты праз зададзены інтэрвал часу.

Для павышэння адмоваўстойлівасці прыкладання, будзем выкарыстоўваць кантрольныя кропкі - чэкпоінты (checkpoints). Пры дапамозе гэтага механізму, калі модулю Spark Streaming запатрабуецца аднавіць згубленыя дадзеныя, яму трэба будзе толькі вярнуцца да апошняй кантрольнай кропкі і аднавіць вылічэнні ад яе.

Архітэктура распрацоўванай сістэмы

Apache Kafka і струменевая апрацоўка даных з дапамогай Spark Streaming

Выкарыстоўваныя кампаненты:

  • Apache Kafka - Гэта размеркаваная сістэма абмену паведамленнямі з публікацыяй і падпіскай. Падыходзіць як для аўтаномнага, так і для анлайнавага спажывання паведамленняў. Для прадухілення страты дадзеных паведамленні Kafka захоўваюцца на дыску і рэпліцыруюцца ўнутры кластара. Сістэма Kafka пабудавана па-над службай сінхранізацыі ZooKeeper;
  • Apache Spark Streaming - кампанент Spark для апрацоўкі струменевых дадзеных. Модуль Spark Streaming пабудаваны з ужываннем «мікрапакетнай» архітэктуры (micro-batch architecture), калі струмень дадзеных інтэрпрэтуецца як бесперапынная паслядоўнасць маленькіх пакетаў дадзеных. Spark Streaming прымае дадзеныя з розных крыніц і аб'ядноўвае іх у невялікія пакеты. Новыя пакеты ствараюцца праз рэгулярныя інтэрвалы часу. У пачатку кожнага інтэрвалу часу ствараецца новы пакет, і любыя дадзеныя, якія паступілі на працягу гэтага інтэрвалу, уключаюцца ў пакет. У канцы інтэрвалу павелічэнне пакета спыняецца. Памер інтэрвалу вызначаецца параметрам, які завецца інтэрвал пакетавання (batch interval);
  • Apache Spark SQL - аб'ядноўвае рэляцыйную апрацоўку з функцыянальным праграмаваннем Spark. Пад структураванымі дадзенымі маюцца на ўвазе дадзеныя, якія маюць схему, гэта значыць адзіны набор палёў для ўсіх запісаў. Spark SQL падтрымлівае ўвод з мноства крыніц структураваных дадзеных і, дзякуючы наяўнасці інфармацыі аб схеме, ён можа эфектыўна здабываць толькі неабходныя палі запісаў, а таксама падае API-інтэрфейсы DataFrame;
  • AWS RDS – гэта параўнальна недарагая хмарная рэляцыйная база дадзеных, вэб-сэрвіс, які спрашчае наладу, эксплуатацыю і маштабаванне, адмініструецца непасрэдна Amazon.

Усталяванне і запуск сервера Kafka

Перад непасрэдным выкарыстаннем Kafka, неабходна пераканацца ў наяўнасці Java, т.я. для працы выкарыстоўваецца JVM:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Створым новага карыстальніка для працы з Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Далей спампоўваем дыстрыбутыў з афіцыйнага сайта Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Распакоўваем скачаны архіў:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Наступны крок - апцыянальны. Справа ў тым, што налады па змаўчанні не дазваляюць паўнавартасна выкарыстоўваць усе магчымасці Apache Kafka. Напрыклад, выдаляць тэму, катэгорыю, групу, на якія могуць быць апублікаваны паведамленні. Каб змяніць гэта, адрэдагуем файл канфігурацыі:

vim ~/kafka/config/server.properties

Дадайце ў канец файла наступнае:

delete.topic.enable = true

Перад запускам сервера Kafka, неабходна стартаваць сервер ZooKeeper, будзем выкарыстоўваць дапаможны скрыпт, які пастаўляецца разам з дыстрыбутывам Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Пасля таго, як ZooKeeper паспяхова стартаваў, у асобным тэрмінале запускаем сервер Kafka:

bin/kafka-server-start.sh config/server.properties

Створым новы топік пад назвай Transaction:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Пераканаемся, што топік з патрэбнай колькасцю партыцый і рэплікацыяй быў створаны:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka і струменевая апрацоўка даных з дапамогай Spark Streaming

Выпусцім моманты тэставання прадзюсара і кансьюмера для ізноў створанага топіка. Больш падрабязна пра тое, як можна пратэставаць адпраўку і прыём паведамленняў, напісана ў афіцыйнай дакументацыі. Send some messages. Ну а мы пераходзім да напісання прадзюсара на Python з выкарыстаннем KafkaProducer API.

Напісанне прадзюсара

Прадзюсар будзе генераваць выпадковыя дадзеныя - па 100 паведамленняў кожную секунду. Пад выпадковымі дадзенымі будзем разумець слоўнік, які складаецца з трох палёў:

  • Філіял - найменне пункту продажаў крэдытнай арганізацыі;
  • Валюта - Валюта здзелкі;
  • сума - Сума здзелкі. Сума будзе станоўчым лікам, калі гэта купля валюты Банкам, і адмоўным - калі продаж.

Код для прадзюсара выглядае наступным чынам:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Далей, выкарыстоўваючы метад send, адпраўляем паведамленне на сервер, у патрэбны нам топік, у фармаце JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Пры запуску скрыпту атрымліваем у тэрмінале наступныя паведамленні:

Apache Kafka і струменевая апрацоўка даных з дапамогай Spark Streaming

Гэта азначае, што ўсё працуе як мы хацелі - прадзюсар генерыт і адпраўляе паведамленні ў патрэбны нам топік.
Наступным крокам будзе ўстаноўка Spark і апрацоўка гэтага патоку паведамленняў.

Ўстаноўка Apache Spark

Apache Spark - гэта ўніверсальная і высокапрадукцыйная кластарная вылічальная платформа.

Па прадукцыйнасці Spark пераўзыходзіць папулярныя рэалізацыі мадэлі MapReduce, адначасна забяспечваючы падтрымку шырэйшага дыяпазону тыпаў вылічэнняў, уключаючы інтэрактыўныя запыты і струменевую апрацоўку. Хуткасць гуляе важную ролю пры апрацоўцы вялікіх аб'ёмаў дадзеных, бо менавіта хуткасць дазваляе працаваць у інтэрактыўным рэжыме, не марнуючы хвіліны ці гадзіны на чаканне. Адно з найважных добрых якасцяў Spark, якія забяспечваюць гэтак высокую хуткасць, здольнасць выконваць вылічэнні ў памяці.

Дадзены фрэймворк напісаны на Scala, таму неабходна ўсталяваць яе ў першую чаргу:

sudo apt-get install scala

Спампоўваем з афіцыйнага сайта дыстрыбутыў Spark:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Распакоўваем архіў:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Дадаем шлях да Spark у bash-файл:

vim ~/.bashrc

Дадаем праз рэдактар ​​наступныя радкі:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Выконваем каманду ніжэй пасля ўнясення правак у bashrc:

source ~/.bashrc

Разгортванне AWS PostgreSQL

Засталося разгарнуць базу даных, куды будзем заліваць апрацаваную інфармацыю з патокаў. Для гэтага будзем выкарыстоўваць сэрвіс AWS RDS.

Заходзім у кансоль AWS -> AWS RDS -> Databases -> Create database:
Apache Kafka і струменевая апрацоўка даных з дапамогай Spark Streaming

Выбіраемы PostgreSQL і націскаем кнопку Next:
Apache Kafka і струменевая апрацоўка даных з дапамогай Spark Streaming

Т.к. дадзены прыклад разбіраецца выключна ў адукацыйных мэтах, будзем выкарыстоўваць бясплатны сервер "на мінімалках" (Free Tier):
Apache Kafka і струменевая апрацоўка даных з дапамогай Spark Streaming

Далей, ставім галачку ў блоку Free Tier, і пасля гэтага нам аўтаматычна будзе прапанаваны інстанс класа t2.micro – хоць і слабенькі, але бясплатны і цалкам падыдзе для нашай задачы:
Apache Kafka і струменевая апрацоўка даных з дапамогай Spark Streaming

Следам ідуць вельмі важныя рэчы: найменне інстанса БД, імя майстар-карыстальніка і яго пароль. Назавем інстанст: myHabrTest, майстар-карыстальнік: хабр, пароль: хабр12345 і націскаем на кнопку Next:
Apache Kafka і струменевая апрацоўка даных з дапамогай Spark Streaming

На наступнай старонцы знаходзяцца параметры, якія адказваюць за даступнасць нашага сервера БД звонку (Public accessibility) і даступнасць партоў:

Apache Kafka і струменевая апрацоўка даных з дапамогай Spark Streaming

Давайце створым новую наладу для VPC security group, якая дазволіць звонку звяртацца да нашага сервера БД праз порт 5432 (PostgreSQL).
Пяройдзем у асобным акне браўзэра да кансолі AWS у падзел VPC Dashboard -> Security Groups -> Create security group:
Apache Kafka і струменевая апрацоўка даных з дапамогай Spark Streaming

Задаем імя для Security group - PostgreSQL, апісанне, паказваем да якой VPC дадзеная група павінна быць асацыяваная і націскаем кнопку Create:
Apache Kafka і струменевая апрацоўка даных з дапамогай Spark Streaming

Запаўняем для свежастворанай групы Inbound rules для порта 5432, як паказана на малюнку ніжэй. Уручную порт можна не паказваць, а абраць PostgreSQL з які расчыняецца спісу Type.

Строга кажучы, значэнне ::/0 азначае даступнасць уваходнага трафіку для сервера з усяго свету, што кананічна не зусім дакладна, але для разбору прыкладу дазволім сабе выкарыстоўваць такі падыход:
Apache Kafka і струменевая апрацоўка даных з дапамогай Spark Streaming

Вяртаемся да старонкі браўзэра, дзе ў нас адчынена "Configure advanced settings" і выбіраемы ў падзеле VPC security groups -> Choose existing VPC security groups -> PostgreSQL:
Apache Kafka і струменевая апрацоўка даных з дапамогай Spark Streaming

Далей, у падзеле Database options -> Database name -> задаем імя - habrDB.

Астатнія параметры, за выключэннем хіба што адключэння бэкапіравання (backup retention period – 0 days), маніторынгу і Performance Insights, можам пакінуць па змаўчанні. Націскаем на кнопку Стварыць базу дадзеных:
Apache Kafka і струменевая апрацоўка даных з дапамогай Spark Streaming

Апрацоўшчык патокаў

Завяршальным этапам будзе распрацоўка Spark-джобы, якая будзе кожныя дзве секунды апрацоўваць новыя дадзеныя, якія прыйшлі ад Kafka і ўносіць вынік у базу дадзеных.

Як было адзначана вышэй, кантрольныя кропкі (сheckpoints) - гэта асноўны механізм у SparkStreaming, які павінен быць настроены для забеспячэння адмоваўстойлівасці. Будзем выкарыстоўваць кантрольныя кропкі і, у выпадку падзення працэдуры, модулю Spark Streaming для аднаўлення страчаных дадзеных трэба будзе толькі вярнуцца да апошняй кантрольнай кропкі і аднавіць вылічэнні ад яе.

Кантрольную кропку можна ўключыць, усталяваўшы каталог у адмоваўстойлівай, надзейнай файлавай сістэме (напрыклад, HDFS, S3 і т. Д.), у якой будзе захавана інфармацыя кантрольнай кропкі. Гэта робіцца з дапамогай, напрыклад:

streamingContext.checkpoint(checkpointDirectory)

У нашым прыкладзе будзем выкарыстоўваць наступны падыход, а менавіта, калі checkpointDirectory існуе, то кантэкст будзе ўзноўлены з дадзеных кантрольнай кропкі. Калі каталог не існуе (г.зн. выконваецца ў першы раз), тое выклікаецца функцыя functionToCreateContext для стварэння новага кантэксту і налады DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Ствараем аб'ект DirectStream з мэтай падлучэння да топіка "transaction" пры дапамозе метаду createDirectStream бібліятэкі KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Парсім уваходныя дадзеныя ў фармаце JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Выкарыстоўваючы Spark SQL, які робіцца нескладаную групоўку і выводны вынік у кансоль:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Атрыманне тэксту запыту і запуск яго праз Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

А затым захоўваем атрыманыя агрэгаваныя дадзеныя ў табліцу ў AWS RDS. Каб захаваць вынікі агрэгацыі ў табліцу базы дадзеных, будзем выкарыстоўваць метад write аб'екта DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Некалькі слоў аб наладзе падлучэння да AWS RDS. Карыстальніка і пароль да яго мы стваралі на кроку "Разгортванне AWS PostgreSQL". У якасці url сервера баз дадзеных варта выкарыстоўваць Endpoint, які адлюстроўваецца ў падзеле Connectivity & security:

Apache Kafka і струменевая апрацоўка даных з дапамогай Spark Streaming

У мэтах карэктнай звязкі Spark і Kafka, варта запускаць джобу праз smark-submit з выкарыстаннем артэфакта spark-streaming-kafka-0-8_2.11. Дадаткова ўжыем таксама артэфакт для ўзаемадзеяння з базай дадзеных PostgreSQL, іх будзем перадаваць праз -packages.

Для гнуткасці скрыпту, вынесем у якасці ўваходных параметраў таксама найменне сервера паведамленняў і топік, з якога жадаем атрымліваць дадзеныя.

Такім чынам, прыйшоў час запусціць і праверыць працаздольнасць сістэмы:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Усё атрымалася! Як відаць на малюнку ніжэй - падчас працы прыкладання новыя вынікі агрэгацыі выводзяцца кожныя 2 секунды, таму што мы ўсталявалі інтэрвал пакетавання роўным 2 секундам, калі стваралі аб'ект StreamingContext:

Apache Kafka і струменевая апрацоўка даных з дапамогай Spark Streaming

Далей, які робіцца няхітры запыт да базы дадзеных, каб праверыць наяўнасць запісаў у табліцы transaction_flow:

Apache Kafka і струменевая апрацоўка даных з дапамогай Spark Streaming

Заключэнне

У дадзеным артыкуле быў разгледжаны прыклад струменевай апрацоўкі інфармацыі з выкарыстаннем Spark Streaming у звязку з Apache Kafka і PostgreSQL. З ростам аб'ёмаў дадзеных з розных крыніц, складана пераацаніць практычную каштоўнасць Spark Streaming для стварэння струменевых прыкладанняў і прыкладанняў, якія дзейнічаюць у маштабе рэальнага часу.

Поўны зыходны код вы можаце знайсці ў маім рэпазітары на GitHub.

З задавальненнем гатовы абмеркаваць дадзены артыкул, чакаю Вашых каментароў, а таксама, спадзяюся на канструктыўную крытыку ўсіх неабыякавых чытачоў.

Жадаю поспехаў!

Ps. Першапачаткова планавалася выкарыстоўваць лакальную БД PostgreSQL, але улічваючы маё каханне да AWS, я вырашыў вынесці базу дадзеных у воблака. У наступным артыкуле па гэтай тэме я пакажу, як рэалізаваць цалкам апісаную сістэму ў AWS пры дапамозе AWS Kinesis і AWS EMR. Сачыце за навінамі!

Крыніца: habr.com

Дадаць каментар