Апацхе Кафка и стриминг обрада података уз Спарк Стреаминг
Здраво, Хабр! Данас ћемо изградити систем који ће обрађивати Апацхе Кафка токове порука користећи Спарк Стреаминг и записивати резултате обраде у АВС РДС базу података у облаку.
Замислимо да нам одређена кредитна институција поставља задатак да обрађујемо пристигле трансакције „у ходу“ у свим својим филијалама. Ово се може урадити у сврху брзог израчунавања отворене валутне позиције за трезор, лимита или финансијских резултата за трансакције итд.
Како имплементирати овај случај без употребе магије и магијских чаролија - прочитајте испод реза! Иди!
Наравно, обрада велике количине података у реалном времену пружа широке могућности за коришћење у савременим системима. Једна од најпопуларнијих комбинација за ово је тандем Апацхе Кафка и Спарк Стреаминг, где Кафка креира ток долазних пакета порука, а Спарк Стреаминг обрађује ове пакете у датом временском интервалу.
Да бисмо повећали толеранцију на грешке апликације, користићемо контролне тачке. Са овим механизмом, када Спарк Стреаминг мотор треба да поврати изгубљене податке, само треба да се врати на последњу контролну тачку и одатле настави прорачуне.
Архитектура развијеног система
Коришћене компоненте:
Апацхе Кафка је дистрибуирани систем за размену порука за објављивање-претплату. Погодно за коришћење порука ван мреже и на мрежи. Да би се спречио губитак података, Кафка поруке се чувају на диску и реплицирају унутар кластера. Кафка систем је изграђен на врху сервиса за синхронизацију ЗооКеепер;
Апацхе Спарк Стреаминг - Спарк компонента за обраду стриминг података. Модул Спарк Стреаминг је изграђен коришћењем микро-батцх архитектуре, где се ток података тумачи као континуирани низ малих пакета података. Спарк Стреаминг узима податке из различитих извора и комбинује их у мале пакете. Нови пакети се креирају у редовним интервалима. На почетку сваког временског интервала креира се нови пакет и сви подаци примљени током тог интервала се укључују у пакет. На крају интервала, раст пакета се зауставља. Величина интервала је одређена параметром који се зове интервал серије;
Апацхе Спарк СКЛ - комбинује релациону обраду са Спарк функционалним програмирањем. Структурирани подаци означавају податке који имају шему, односно један скуп поља за све записе. Спарк СКЛ подржава унос из различитих структурираних извора података и, захваљујући доступности информација о шеми, може ефикасно да преузме само потребна поља записа, а такође обезбеђује АПИ-је за ДатаФраме;
АВС РДС је релативно јефтина релациона база података заснована на облаку, веб сервис који поједностављује подешавање, рад и скалирање, а њиме директно администрира Амазон.
Инсталирање и покретање Кафка сервера
Пре него што директно користите Кафку, морате се уверити да имате Јава, јер... ЈВМ се користи за рад:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Следећи корак је опциони. Чињеница је да подразумеване поставке не дозвољавају да у потпуности користите све функције Апацхе Кафке. На пример, избришите тему, категорију, групу у којој се поруке могу објављивати. Да бисмо ово променили, уредимо конфигурациону датотеку:
vim ~/kafka/config/server.properties
Додајте следеће на крај датотеке:
delete.topic.enable = true
Пре него што покренете Кафка сервер, потребно је да покренете сервер ЗооКеепер; користићемо помоћну скрипту која долази са Кафка дистрибуцијом:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Након што се ЗооКеепер успешно покрене, покрените Кафка сервер у посебном терминалу:
Пропустимо тренутке тестирања произвођача и потрошача за новонасталу тему. Више детаља о томе како можете тестирати слање и примање порука је написано у званичној документацији - Пошаљите неке поруке. Па, прелазимо на писање произвођача у Питхон-у користећи КафкаПродуцер АПИ.
Писање продуцента
Произвођач ће генерисати насумичне податке - 100 порука сваке секунде. Под случајним подацима подразумевамо речник који се састоји од три поља:
Филијала — назив продајног места кредитне институције;
Валута — валута трансакције;
Износ - Износ трансакције. Износ ће бити позитиван број ако се ради о куповини валуте од стране Банке, а негативан ако је у питању продаја.
Затим, користећи метод слања, шаљемо поруку серверу, на тему која нам је потребна, у ЈСОН формату:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Када покренемо скрипту, добијамо следеће поруке у терминалу:
То значи да све функционише како смо желели - произвођач генерише и шаље поруке на тему која нам је потребна.
Следећи корак је да инсталирате Спарк и обрадите овај ток порука.
Инсталирање Апацхе Спарк-а
Апацхе Спарк је универзална и кластер рачунарска платформа високих перформанси.
Спарк ради боље од популарних имплементација МапРедуце модела док подржава шири спектар типова прорачуна, укључујући интерактивне упите и обраду тока. Брзина игра важну улогу при обради великих количина података, јер је брзина та која вам омогућава да радите интерактивно без трошења минута или сати на чекање. Једна од највећих предности Спарк-а која га чини тако брзим је његова способност да изврши прорачуне у меморији.
Овај оквир је написан у Сцали, тако да прво морате да га инсталирате:
sudo apt-get install scala
Преузмите дистрибуцију Спарк са званичне веб странице:
Покрените наредбу испод након што унесете измене у басхрц:
source ~/.bashrc
Примена АВС ПостгреСКЛ
Остаје само да поставимо базу података у коју ћемо учитати обрађене информације из стримова. За ово ћемо користити услугу АВС РДС.
Идите на АВС конзолу -> АВС РДС -> Базе података -> Креирај базу података:
Изаберите ПостгреСКЛ и кликните на Даље:
Јер Овај пример је само у образовне сврхе; користићемо бесплатни сервер „најмање“ (бесплатни ниво):
Затим стављамо квачицу у блок Фрее Тиер, а након тога ће нам аутоматски бити понуђена инстанца класе т2.мицро - иако слаба, бесплатна је и сасвим погодна за наш задатак:
Следе веома важне ствари: име инстанце базе података, име главног корисника и његова лозинка. Назовимо инстанцу: миХабрТест, главни корисник: хабр, Лозинка: хабр12345 и кликните на дугме Даље:
На следећој страници налазе се параметри одговорни за приступачност нашег сервера базе података споља (Публиц аццессибилити) и доступност порта:
Хајде да направимо нову поставку за ВПЦ безбедносну групу, која ће омогућити екстерни приступ нашем серверу базе података преко порта 5432 (ПостгреСКЛ).
Идемо на АВС конзолу у посебном прозору прегледача на ВПЦ контролну таблу -> Безбедносне групе -> Креирај одељак безбедносне групе:
Постављамо име за безбедносну групу - ПостгреСКЛ, опис, означавамо са којим ВПЦ-ом ова група треба да буде повезана и кликнемо на дугме Креирај:
Попуните улазна правила за порт 5432 за новокреирану групу, као што је приказано на слици испод. Не можете ручно да одредите порт, већ изаберите ПостгреСКЛ са падајуће листе Тип.
Строго говорећи, вредност ::/0 означава доступност долазног саобраћаја на сервер из целог света, што канонски није сасвим тачно, али да анализирамо пример, дозволимо себи да користимо овај приступ:
Враћамо се на страницу претраживача, где имамо отворену „Конфигуриши напредна подешавања“ и бирамо у одељку ВПЦ безбедносне групе -> Изабери постојеће ВПЦ безбедносне групе -> ПостгреСКЛ:
Затим, у опцијама базе података -> Име базе података -> поставите име - хабрДБ.
Преостале параметре, са изузетком онемогућавања резервне копије (период задржавања резервне копије - 0 дана), праћења и увида у перформансе, можемо оставити подразумевано. Кликните на дугме Креирајте базу података:
Тхреад хандлер
Завршна фаза биће развој Спарк посла, који ће сваке две секунде обрађивати нове податке који долазе од Кафке и уносити резултат у базу података.
Као што је горе наведено, контролне тачке су основни механизам у СпаркСтреаминг-у који мора бити конфигурисан да би се осигурала толеранција грешака. Користићемо контролне тачке и, ако процедура не успе, модул Спарк Стреаминг ће морати само да се врати на последњу контролну тачку и настави са прорачунима са ње да би повратио изгубљене податке.
Контролна тачка се може омогућити постављањем директоријума на поузданом систему датотека отпорном на грешке (као што је ХДФС, С3, итд.) у којем ће бити ускладиштене информације о контролној тачки. Ово се ради помоћу, на пример:
streamingContext.checkpoint(checkpointDirectory)
У нашем примеру користићемо следећи приступ, наиме, ако цхецкпоинтДирецтори постоји, онда ће контекст бити поново креиран из података контролне тачке. Ако директоријум не постоји (тј. извршава се први пут), функцијаТоЦреатеЦонтект се позива да креира нови контекст и конфигурише ДСтреамс:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Креирамо ДирецтСтреам објекат да се повежемо са темом „трансакције“ користећи методу цреатеДирецтСтреам библиотеке КафкаУтилс:
Користећи Спарк СКЛ, радимо једноставно груписање и приказујемо резултат у конзоли:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Преузимање текста упита и покретање кроз Спарк СКЛ:
А затим сачувамо резултујуће обједињене податке у табелу у АВС РДС. Да бисмо сачували резултате агрегације у табели базе података, користићемо метод писања објекта ДатаФраме:
Неколико речи о подешавању везе са АВС РДС. Направили смо корисника и лозинку за њега у кораку „Примена АВС ПостгреСКЛ-а“. Требало би да користите Ендпоинт као УРЛ сервера базе података, који је приказан у одељку Повезивање и безбедност:
Да бисте исправно повезали Спарк и Кафку, требало би да покренете посао преко смарк-субмит користећи артефакт спарк-стреаминг-кафка-0-8_2.11. Поред тога, користићемо и артефакт за интеракцију са ПостгреСКЛ базом података; пренећемо их преко --пацкагес.
Ради флексибилности скрипте, као улазне параметре укључићемо и назив сервера порука и тему из које желимо да примамо податке.
Дакле, време је да покренете и проверите функционалност система:
Све је успело! Као што можете видети на слици испод, док је апликација покренута, нови резултати агрегације се излазе сваке 2 секунде, јер смо поставили интервал батцхинг-а на 2 секунде када смо креирали СтреамингЦонтект објекат:
Затим правимо једноставан упит бази података да проверимо присуство записа у табели трансакцијски_ток:
Закључак
Овај чланак је разматрао пример стреам обраде информација помоћу Спарк Стреаминг-а у комбинацији са Апацхе Кафка и ПостгреСКЛ-ом. Са порастом података из различитих извора, тешко је преценити практичну вредност Спарк Стреаминг-а за креирање стриминг и апликација у реалном времену.
Комплетан изворни код можете пронаћи у мом спремишту на ГитХуб.
Драго ми је да разговарам о овом чланку, радујем се вашим коментарима, а такође се надам конструктивној критици свих брижних читалаца.
Желим вам успех!
Пс. У почетку је било планирано да се користи локална ПостгреСКЛ база података, али с обзиром на моју љубав према АВС-у, одлучио сам да преместим базу података у облак. У следећем чланку на ову тему показаћу како да имплементирате цео систем описан горе у АВС користећи АВС Кинесис и АВС ЕМР. Пратите вести!