Апацхе Кафка и стриминг обрада података уз Спарк Стреаминг

Здраво, Хабр! Данас ћемо изградити систем који ће обрађивати Апацхе Кафка токове порука користећи Спарк Стреаминг и записивати резултате обраде у АВС РДС базу података у облаку.

Замислимо да нам одређена кредитна институција поставља задатак да обрађујемо пристигле трансакције „у ходу“ у свим својим филијалама. Ово се може урадити у сврху брзог израчунавања отворене валутне позиције за трезор, лимита или финансијских резултата за трансакције итд.

Како имплементирати овај случај без употребе магије и магијских чаролија - прочитајте испод реза! Иди!

Апацхе Кафка и стриминг обрада података уз Спарк Стреаминг
(извор слике)

Увод

Наравно, обрада велике количине података у реалном времену пружа широке могућности за коришћење у савременим системима. Једна од најпопуларнијих комбинација за ово је тандем Апацхе Кафка и Спарк Стреаминг, где Кафка креира ток долазних пакета порука, а Спарк Стреаминг обрађује ове пакете у датом временском интервалу.

Да бисмо повећали толеранцију на грешке апликације, користићемо контролне тачке. Са овим механизмом, када Спарк Стреаминг мотор треба да поврати изгубљене податке, само треба да се врати на последњу контролну тачку и одатле настави прорачуне.

Архитектура развијеног система

Апацхе Кафка и стриминг обрада података уз Спарк Стреаминг

Коришћене компоненте:

  • Апацхе Кафка је дистрибуирани систем за размену порука за објављивање-претплату. Погодно за коришћење порука ван мреже и на мрежи. Да би се спречио губитак података, Кафка поруке се чувају на диску и реплицирају унутар кластера. Кафка систем је изграђен на врху сервиса за синхронизацију ЗооКеепер;
  • Апацхе Спарк Стреаминг - Спарк компонента за обраду стриминг података. Модул Спарк Стреаминг је изграђен коришћењем микро-батцх архитектуре, где се ток података тумачи као континуирани низ малих пакета података. Спарк Стреаминг узима податке из различитих извора и комбинује их у мале пакете. Нови пакети се креирају у редовним интервалима. На почетку сваког временског интервала креира се нови пакет и сви подаци примљени током тог интервала се укључују у пакет. На крају интервала, раст пакета се зауставља. Величина интервала је одређена параметром који се зове интервал серије;
  • Апацхе Спарк СКЛ - комбинује релациону обраду са Спарк функционалним програмирањем. Структурирани подаци означавају податке који имају шему, односно један скуп поља за све записе. Спарк СКЛ подржава унос из различитих структурираних извора података и, захваљујући доступности информација о шеми, може ефикасно да преузме само потребна поља записа, а такође обезбеђује АПИ-је за ДатаФраме;
  • АВС РДС је релативно јефтина релациона база података заснована на облаку, веб сервис који поједностављује подешавање, рад и скалирање, а њиме директно администрира Амазон.

Инсталирање и покретање Кафка сервера

Пре него што директно користите Кафку, морате се уверити да имате Јава, јер... ЈВМ се користи за рад:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Хајде да направимо новог корисника за рад са Кафком:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Затим преузмите дистрибуцију са званичне веб странице Апацхе Кафка:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Распакујте преузету архиву:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Следећи корак је опциони. Чињеница је да подразумеване поставке не дозвољавају да у потпуности користите све функције Апацхе Кафке. На пример, избришите тему, категорију, групу у којој се поруке могу објављивати. Да бисмо ово променили, уредимо конфигурациону датотеку:

vim ~/kafka/config/server.properties

Додајте следеће на крај датотеке:

delete.topic.enable = true

Пре него што покренете Кафка сервер, потребно је да покренете сервер ЗооКеепер; користићемо помоћну скрипту која долази са Кафка дистрибуцијом:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Након што се ЗооКеепер успешно покрене, покрените Кафка сервер у посебном терминалу:

bin/kafka-server-start.sh config/server.properties

Хајде да направимо нову тему под називом Трансакција:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Хајде да се уверимо да је креирана тема са потребним бројем партиција и репликације:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Апацхе Кафка и стриминг обрада података уз Спарк Стреаминг

Пропустимо тренутке тестирања произвођача и потрошача за новонасталу тему. Више детаља о томе како можете тестирати слање и примање порука је написано у званичној документацији - Пошаљите неке поруке. Па, прелазимо на писање произвођача у Питхон-у користећи КафкаПродуцер АПИ.

Писање продуцента

Произвођач ће генерисати насумичне податке - 100 порука сваке секунде. Под случајним подацима подразумевамо речник који се састоји од три поља:

  • Филијала — назив продајног места кредитне институције;
  • Валута — валута трансакције;
  • Износ - Износ трансакције. Износ ће бити позитиван број ако се ради о куповини валуте од стране Банке, а негативан ако је у питању продаја.

Код произвођача изгледа овако:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Затим, користећи метод слања, шаљемо поруку серверу, на тему која нам је потребна, у ЈСОН формату:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Када покренемо скрипту, добијамо следеће поруке у терминалу:

Апацхе Кафка и стриминг обрада података уз Спарк Стреаминг

То значи да све функционише како смо желели - произвођач генерише и шаље поруке на тему која нам је потребна.
Следећи корак је да инсталирате Спарк и обрадите овај ток порука.

Инсталирање Апацхе Спарк-а

Апацхе Спарк је универзална и кластер рачунарска платформа високих перформанси.

Спарк ради боље од популарних имплементација МапРедуце модела док подржава шири спектар типова прорачуна, укључујући интерактивне упите и обраду тока. Брзина игра важну улогу при обради великих количина података, јер је брзина та која вам омогућава да радите интерактивно без трошења минута или сати на чекање. Једна од највећих предности Спарк-а која га чини тако брзим је његова способност да изврши прорачуне у меморији.

Овај оквир је написан у Сцали, тако да прво морате да га инсталирате:

sudo apt-get install scala

Преузмите дистрибуцију Спарк са званичне веб странице:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Распакујте архиву:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Додајте путању до Спарк-а у басх датотеку:

vim ~/.bashrc

Додајте следеће редове кроз уређивач:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Покрените наредбу испод након што унесете измене у басхрц:

source ~/.bashrc

Примена АВС ПостгреСКЛ

Остаје само да поставимо базу података у коју ћемо учитати обрађене информације из стримова. За ово ћемо користити услугу АВС РДС.

Идите на АВС конзолу -> АВС РДС -> Базе података -> Креирај базу података:
Апацхе Кафка и стриминг обрада података уз Спарк Стреаминг

Изаберите ПостгреСКЛ и кликните на Даље:
Апацхе Кафка и стриминг обрада података уз Спарк Стреаминг

Јер Овај пример је само у образовне сврхе; користићемо бесплатни сервер „најмање“ (бесплатни ниво):
Апацхе Кафка и стриминг обрада података уз Спарк Стреаминг

Затим стављамо квачицу у блок Фрее Тиер, а након тога ће нам аутоматски бити понуђена инстанца класе т2.мицро - иако слаба, бесплатна је и сасвим погодна за наш задатак:
Апацхе Кафка и стриминг обрада података уз Спарк Стреаминг

Следе веома важне ствари: име инстанце базе података, име главног корисника и његова лозинка. Назовимо инстанцу: миХабрТест, главни корисник: хабр, Лозинка: хабр12345 и кликните на дугме Даље:
Апацхе Кафка и стриминг обрада података уз Спарк Стреаминг

На следећој страници налазе се параметри одговорни за приступачност нашег сервера базе података споља (Публиц аццессибилити) и доступност порта:

Апацхе Кафка и стриминг обрада података уз Спарк Стреаминг

Хајде да направимо нову поставку за ВПЦ безбедносну групу, која ће омогућити екстерни приступ нашем серверу базе података преко порта 5432 (ПостгреСКЛ).
Идемо на АВС конзолу у посебном прозору прегледача на ВПЦ контролну таблу -> Безбедносне групе -> Креирај одељак безбедносне групе:
Апацхе Кафка и стриминг обрада података уз Спарк Стреаминг

Постављамо име за безбедносну групу - ПостгреСКЛ, опис, означавамо са којим ВПЦ-ом ова група треба да буде повезана и кликнемо на дугме Креирај:
Апацхе Кафка и стриминг обрада података уз Спарк Стреаминг

Попуните улазна правила за порт 5432 за новокреирану групу, као што је приказано на слици испод. Не можете ручно да одредите порт, већ изаберите ПостгреСКЛ са падајуће листе Тип.

Строго говорећи, вредност ::/0 означава доступност долазног саобраћаја на сервер из целог света, што канонски није сасвим тачно, али да анализирамо пример, дозволимо себи да користимо овај приступ:
Апацхе Кафка и стриминг обрада података уз Спарк Стреаминг

Враћамо се на страницу претраживача, где имамо отворену „Конфигуриши напредна подешавања“ и бирамо у одељку ВПЦ безбедносне групе -> Изабери постојеће ВПЦ безбедносне групе -> ПостгреСКЛ:
Апацхе Кафка и стриминг обрада података уз Спарк Стреаминг

Затим, у опцијама базе података -> Име базе података -> поставите име - хабрДБ.

Преостале параметре, са изузетком онемогућавања резервне копије (период задржавања резервне копије - 0 дана), праћења и увида у перформансе, можемо оставити подразумевано. Кликните на дугме Креирајте базу података:
Апацхе Кафка и стриминг обрада података уз Спарк Стреаминг

Тхреад хандлер

Завршна фаза биће развој Спарк посла, који ће сваке две секунде обрађивати нове податке који долазе од Кафке и уносити резултат у базу података.

Као што је горе наведено, контролне тачке су основни механизам у СпаркСтреаминг-у који мора бити конфигурисан да би се осигурала толеранција грешака. Користићемо контролне тачке и, ако процедура не успе, модул Спарк Стреаминг ће морати само да се врати на последњу контролну тачку и настави са прорачунима са ње да би повратио изгубљене податке.

Контролна тачка се може омогућити постављањем директоријума на поузданом систему датотека отпорном на грешке (као што је ХДФС, С3, итд.) у којем ће бити ускладиштене информације о контролној тачки. Ово се ради помоћу, на пример:

streamingContext.checkpoint(checkpointDirectory)

У нашем примеру користићемо следећи приступ, наиме, ако цхецкпоинтДирецтори постоји, онда ће контекст бити поново креиран из података контролне тачке. Ако директоријум не постоји (тј. извршава се први пут), функцијаТоЦреатеЦонтект се позива да креира нови контекст и конфигурише ДСтреамс:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Креирамо ДирецтСтреам објекат да се повежемо са темом „трансакције“ користећи методу цреатеДирецтСтреам библиотеке КафкаУтилс:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Рашчлањивање долазних података у ЈСОН формату:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Користећи Спарк СКЛ, радимо једноставно груписање и приказујемо резултат у конзоли:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Преузимање текста упита и покретање кроз Спарк СКЛ:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

А затим сачувамо резултујуће обједињене податке у табелу у АВС РДС. Да бисмо сачували резултате агрегације у табели базе података, користићемо метод писања објекта ДатаФраме:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Неколико речи о подешавању везе са АВС РДС. Направили смо корисника и лозинку за њега у кораку „Примена АВС ПостгреСКЛ-а“. Требало би да користите Ендпоинт као УРЛ сервера базе података, који је приказан у одељку Повезивање и безбедност:

Апацхе Кафка и стриминг обрада података уз Спарк Стреаминг

Да бисте исправно повезали Спарк и Кафку, требало би да покренете посао преко смарк-субмит користећи артефакт спарк-стреаминг-кафка-0-8_2.11. Поред тога, користићемо и артефакт за интеракцију са ПостгреСКЛ базом података; пренећемо их преко --пацкагес.

Ради флексибилности скрипте, као улазне параметре укључићемо и назив сервера порука и тему из које желимо да примамо податке.

Дакле, време је да покренете и проверите функционалност система:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Све је успело! Као што можете видети на слици испод, док је апликација покренута, нови резултати агрегације се излазе сваке 2 секунде, јер смо поставили интервал батцхинг-а на 2 секунде када смо креирали СтреамингЦонтект објекат:

Апацхе Кафка и стриминг обрада података уз Спарк Стреаминг

Затим правимо једноставан упит бази података да проверимо присуство записа у табели трансакцијски_ток:

Апацхе Кафка и стриминг обрада података уз Спарк Стреаминг

Закључак

Овај чланак је разматрао пример стреам обраде информација помоћу Спарк Стреаминг-а у комбинацији са Апацхе Кафка и ПостгреСКЛ-ом. Са порастом података из различитих извора, тешко је преценити практичну вредност Спарк Стреаминг-а за креирање стриминг и апликација у реалном времену.

Комплетан изворни код можете пронаћи у мом спремишту на ГитХуб.

Драго ми је да разговарам о овом чланку, радујем се вашим коментарима, а такође се надам конструктивној критици свих брижних читалаца.

Желим вам успех!

Пс. У почетку је било планирано да се користи локална ПостгреСКЛ база података, али с обзиром на моју љубав према АВС-у, одлучио сам да преместим базу података у облак. У следећем чланку на ову тему показаћу како да имплементирате цео систем описан горе у АВС користећи АВС Кинесис и АВС ЕМР. Пратите вести!

Извор: ввв.хабр.цом

Додај коментар