Apache Kafka и поточно предаване на данни с Spark Streaming

Хей Хабр! Днес ще изградим система, която ще обработва потоци от съобщения на Apache Kafka с помощта на Spark Streaming и ще запише резултата от обработката в облачната база данни на AWS RDS.

Нека си представим, че определена кредитна институция поставя пред нас задачата да обработва входящи транзакции "в движение" за всички свои клонове. Това може да се направи с цел бързо изчисляване на отворената валутна позиция за хазната, лимити или финансов резултат по транзакции и др.

Как да приложите този случай без използването на магия и магически заклинания - прочетете под разреза! Отивам!

Apache Kafka и поточно предаване на данни с Spark Streaming
(Източник на снимката)

въведение

Разбира се, обработката в реално време на голямо количество данни предоставя широки възможности за използване в съвременните системи. Една от най-популярните комбинации за това е тандемът на Apache Kafka и Spark Streaming, където Kafka създава поток от входящи пакети съобщения, а Spark Streaming обработва тези пакети на определен интервал от време.

За да подобрим устойчивостта на грешки на приложението, ще използваме контролни точки - контролни точки. С този механизъм, когато модулът Spark Streaming трябва да възстанови изгубени данни, той трябва само да се върне към последната контролна точка и да възобнови изчисленията оттам.

Архитектурата на разработената система

Apache Kafka и поточно предаване на данни с Spark Streaming

Използвани компоненти:

  • Апачи Кафка е разпределена система за публикуване и абониране на съобщения. Подходящ както за офлайн, така и за онлайн консумация на съобщения. За да се предотврати загуба на данни, Kafka съобщенията се съхраняват на диска и се репликират в рамките на клъстера. Системата Kafka е изградена върху услугата за синхронизация ZooKeeper;
  • Apache Spark Streaming - Spark компонент за обработка на поточни данни. Модулът Spark Streaming е изграден с помощта на микропакетна архитектура, когато поток от данни се интерпретира като непрекъсната последователност от малки пакети данни. Spark Streaming взема данни от различни източници и ги комбинира в малки партиди. Нови пакети се създават на редовни интервали. В началото на всеки интервал от време се създава нов пакет и всички данни, получени през този интервал, се включват в пакета. В края на интервала растежът на пакета спира. Размерът на интервала се определя от параметър, наречен пакетен интервал;
  • Apache Spark SQL - Комбинира релационна обработка с функционално програмиране на Spark. Структурираните данни се отнасят до данни, които имат схема, тоест единичен набор от полета за всички записи. Spark SQL поддържа въвеждане от различни източници на структурирани данни и, поради наличието на информация за схемата, може ефективно да извлича само необходимите полета от записи, а също така предоставя DataFrame API;
  • AWS RDS е сравнително евтина релационна база данни, базирана на облак, уеб услуга, която опростява настройката, работата и мащабирането, администрирана директно от Amazon.

Инсталиране и стартиране на Kafka сървър

Преди да използвате директно Kafka, трябва да се уверите, че имате Java, т.к JVM се използва за работа:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Нека създадем нов потребител за работа с Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

След това изтеглете комплекта за разпространение от официалния уебсайт на Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Разопаковайте изтегления архив:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Следващата стъпка не е задължителна. Факт е, че настройките по подразбиране не ви позволяват да използвате напълно всички функции на Apache Kafka. Например изтриване на тема, категория, група, в която могат да се публикуват съобщения. За да промените това, нека редактираме конфигурационния файл:

vim ~/kafka/config/server.properties

Добавете следното в края на файла:

delete.topic.enable = true

Преди да стартирате сървъра Kafka, трябва да стартирате сървъра ZooKeeper, ние ще използваме помощния скрипт, който идва с дистрибуцията на Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

След като ZooKeeper стартира успешно, стартираме Kafka сървъра в отделен терминал:

bin/kafka-server-start.sh config/server.properties

Нека създадем нова тема, наречена Transaction:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Нека се уверим, че е създадена тема с необходимия брой дялове и репликация:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka и поточно предаване на данни с Spark Streaming

Нека пропуснем моментите на тестване на производителя и потребителя за новосъздадената тема. Повече подробности за това как можете да тествате изпращането и получаването на съобщения са написани в официалната документация - Изпратете няколко съобщения. Е, преминаваме към писане на продуцент в Python, използвайки KafkaProducer API.

Продуцентско писане

Производителят ще генерира произволни данни - 100 съобщения всяка секунда. Под произволни данни имаме предвид речник, състоящ се от три полета:

  • Клон — име на пункта за продажба на кредитната институция;
  • Валута — валута на сделката;
  • Количество - сума на транзакцията. Сумата ще бъде положителна, ако е покупка на валута от Банката, и отрицателна, ако е продажба.

Кодът на производителя изглежда така:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

След това, използвайки метода за изпращане, изпращаме съобщение до сървъра, до темата, от която се нуждаем, във формат JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Когато изпълняваме скрипта, получаваме следните съобщения в терминала:

Apache Kafka и поточно предаване на данни с Spark Streaming

Това означава, че всичко работи както искахме - производителят генерира и изпраща съобщения до темата, от която се нуждаем.
Следващата стъпка е да инсталирате Spark и да обработите този поток от съобщения.

Инсталиране на Apache Spark

Апачи Спарк е многофункционална и високопроизводителна клъстерна изчислителна платформа.

Spark превъзхожда популярните реализации на модела MapReduce по отношение на производителността, като същевременно осигурява поддръжка за по-широк набор от типове изчисления, включително интерактивни заявки и стрийминг. Скоростта играе важна роля при обработката на големи количества данни, тъй като именно скоростта ви позволява да работите интерактивно, без да прекарвате минути или часове в чакане. Една от най-силните страни на Spark за предоставяне на тази скорост е способността му да извършва изчисления в паметта.

Тази рамка е написана на Scala, така че първо трябва да я инсталирате:

sudo apt-get install scala

Изтеглете дистрибуцията на Spark от официалния уебсайт:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Разопаковайте архива:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Добавете пътя към Spark към bash файла:

vim ~/.bashrc

Добавете следните редове чрез редактора:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Изпълнете командата по-долу, след като направите промени в bashrc:

source ~/.bashrc

Внедряване на AWS PostgreSQL

Остава да разширим базата данни, където ще попълваме обработената информация от потоците. За целта ще използваме услугата AWS RDS.

Отидете на AWS конзолата -> AWS RDS -> Бази данни -> Създаване на база данни:
Apache Kafka и поточно предаване на данни с Spark Streaming

Изберете PostgreSQL и щракнете върху бутона Напред:
Apache Kafka и поточно предаване на данни с Spark Streaming

защото този пример е анализиран единствено за образователни цели, ние ще използваме безплатен сървър „като минимум“ (Free Tier):
Apache Kafka и поточно предаване на данни с Spark Streaming

След това проверете блока Free Tier и след това автоматично ще ни бъде предложен екземпляр от класа t2.micro - макар и слаб, но безплатен и доста подходящ за нашата задача:
Apache Kafka и поточно предаване на данни с Spark Streaming

Следват много важни неща: името на инстанцията на DB, името на главния потребител и неговата парола. Нека извикаме екземпляра: myHabrTest, главен потребител: хабр, парола: habr12345 и щракнете върху бутона Напред:
Apache Kafka и поточно предаване на данни с Spark Streaming

Следващата страница съдържа параметрите, отговорни за достъпността на нашия сървър на база данни отвън (публична достъпност) и наличността на портове:

Apache Kafka и поточно предаване на данни с Spark Streaming

Нека създадем нова настройка за групата за сигурност на VPC, която ще позволи външен достъп до нашия сървър на база данни през порт 5432 (PostgreSQL).
Нека отидем в отделен прозорец на браузъра към конзолата на AWS в таблото за управление на VPC -> Групи за сигурност -> раздел Създаване на група за сигурност:
Apache Kafka и поточно предаване на данни с Spark Streaming

Задаваме името на групата за сигурност - PostgreSQL, описание, посочваме с кой VPC трябва да бъде свързана тази група и щракнете върху бутона Създаване:
Apache Kafka и поточно предаване на данни с Spark Streaming

Попълваме новосъздадената група Inbound rules за порт 5432, както е показано на снимката по-долу. Не можете да посочите порта ръчно, но изберете PostgreSQL от падащия списък Тип.

Строго погледнато, стойността ::/0 означава наличието на входящ трафик за сървъра от цял ​​свят, което не е съвсем вярно канонично, но за да анализираме примера, нека използваме този подход:
Apache Kafka и поточно предаване на данни с Spark Streaming

Връщаме се на страницата на браузъра, където имаме отворено „Конфигуриране на разширени настройки“ и избираме VPC групи за сигурност -> Изберете съществуващи VPC групи за сигурност -> PostgreSQL в секцията:
Apache Kafka и поточно предаване на данни с Spark Streaming

След това в секцията Опции за база данни -> Име на база данни -> задайте името - habrDB.

Можем да оставим останалите параметри, с изключение на деактивирането на архивиране (период на съхранение на архива - 0 дни), мониторинг и Performance Insights, по подразбиране. Кликнете върху бутона Създайте база данни:
Apache Kafka и поточно предаване на данни с Spark Streaming

Обработчик на поток

Последният етап ще бъде разработването на задача на Spark, която ще обработва нови данни от Kafka на всеки две секунди и ще въвежда резултата в базата данни.

Както беше отбелязано по-горе, контролните точки са основният механизъм в SparkStreaming, който трябва да бъде конфигуриран, за да осигури толерантност към грешки. Ще използваме контролни точки и в случай на повреда на процедурата, модулът Spark Streaming ще трябва само да се върне към последната контролна точка и да възобнови изчисленията от нея, за да възстанови изгубените данни.

Контролна точка може да бъде активирана чрез задаване на директория на устойчива на грешки, надеждна файлова система (напр. HDFS, S3 и т.н.), където ще се съхранява информацията за контролната точка. Това става например с:

streamingContext.checkpoint(checkpointDirectory)

В нашия пример ще използваме следния подход, а именно, ако checkpointDirectory съществува, тогава контекстът ще бъде пресъздаден от данните за контролната точка. Ако директорията не съществува (т.е. тя се изпълнява за първи път), тогава функцията functionToCreateContext се извиква, за да създаде нов контекст и да настрои DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Създаваме DirectStream обект, за да се свържем с темата „транзакция“, като използваме метода createDirectStream на библиотеката KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Анализ на входящи данни във формат JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Използвайки Spark SQL, ние правим просто групиране и извеждаме резултата на конзолата:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Получаване на тялото на заявката и изпълнението й през Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

И след това запазваме получените обобщени данни в таблица в AWS RDS. За да запазим резултатите от агрегирането в таблица на база данни, ще използваме метода за запис на обекта DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Няколко думи за настройка на връзка към AWS RDS. Създадохме потребителя и паролата за него в стъпката „Внедряване на AWS PostgreSQL“. Като url на сървъра на базата данни трябва да използвате крайната точка, която се показва в раздела Свързване и сигурност:

Apache Kafka и поточно предаване на данни с Spark Streaming

За да свържете правилно Spark и Kafka, трябва да стартирате заданието чрез smark-submit, като използвате артефакта spark-streaming-kafka-0-8_2.11. Освен това ще използваме и артефакт за взаимодействие с базата данни на PostgreSQL, ще ги предаваме през --packages.

За гъвкавост на скрипта ще извадим името на сървъра за съобщения и темата, от която искаме да получаваме данни, като входни параметри.

И така, време е да стартирате и тествате системата:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Всичко се получи! Както можете да видите на снимката по-долу, докато приложението работи, нови резултати от агрегацията се показват на всеки 2 секунди, тъй като зададохме интервала на групиране на 2 секунди, когато създадохме обекта StreamingContext:

Apache Kafka и поточно предаване на данни с Spark Streaming

След това правим проста заявка към базата данни, за да проверим за записи в таблицата транзакционен_поток:

Apache Kafka и поточно предаване на данни с Spark Streaming

Заключение

В тази статия беше разгледан пример за обработка на поточно предаване на информация с помощта на Spark Streaming във връзка с Apache Kafka и PostgreSQL. С нарастването на обемите данни от различни източници е трудно да се надцени практическата стойност на Spark Streaming за създаване на приложения в реално време и стрийминг.

Можете да намерите пълния изходен код в моето хранилище на GitHub.

Щастлив съм да обсъдя тази статия, очаквам вашите коментари и също така се надявам на градивна критика от всички заинтересовани читатели.

Желая ви успех!

Пс. Първоначално беше планирано да се използва локална база данни PostgreSQL, но предвид любовта ми към AWS реших да преместя базата данни в облака. В следващата статия по тази тема ще ви покажа как да внедрите цялата описана по-горе система в AWS с помощта на AWS Kinesis и AWS EMR. Следете новините!

Източник: www.habr.com

Добавяне на нов коментар