ProHoster > Blog > Ma'muriyat > Apache Kafka va Spark Streaming bilan oqimli ma'lumotlarni qayta ishlash
Apache Kafka va Spark Streaming bilan oqimli ma'lumotlarni qayta ishlash
Salom, Xabr! Bugun biz Spark Streaming yordamida Apache Kafka xabar oqimlarini qayta ishlaydigan tizimni quramiz va ishlov berish natijalarini AWS RDS bulutli ma'lumotlar bazasiga yozamiz.
Tasavvur qilaylik, ma'lum bir kredit tashkiloti bizga o'zining barcha filiallari bo'ylab kiruvchi tranzaktsiyalarni "ucharda" qayta ishlash vazifasini qo'yadi. Bu xazina uchun ochiq valyuta pozitsiyasini, operatsiyalar bo'yicha limitlarni yoki moliyaviy natijalarni va hokazolarni tezkor hisoblash maqsadida amalga oshirilishi mumkin.
Bu ishni sehrli va sehrli afsunlardan foydalanmasdan qanday amalga oshirish kerak - kesish ostida o'qing! Bor!
Albatta, real vaqt rejimida katta hajmdagi ma’lumotlarni qayta ishlash zamonaviy tizimlarda foydalanish uchun keng imkoniyatlar yaratadi. Buning uchun eng mashhur kombinatsiyalardan biri Apache Kafka va Spark Streaming tandemidir, bu yerda Kafka kiruvchi xabarlar paketlari oqimini yaratadi va Spark Streaming bu paketlarni ma'lum vaqt oralig'ida qayta ishlaydi.
Ilovaning xatolarga chidamliligini oshirish uchun biz nazorat nuqtalaridan foydalanamiz. Ushbu mexanizm yordamida Spark Streaming mexanizmi yo'qolgan ma'lumotlarni qayta tiklashi kerak bo'lganda, u faqat oxirgi nazorat nuqtasiga qaytib, u erdan hisob-kitoblarni davom ettirishi kerak.
Rivojlangan tizimning arxitekturasi
Ishlatilgan komponentlar:
Apache Kafka tarqatilgan nashr-obuna xabarlar tizimidir. Ham oflayn, ham onlayn xabarlarni iste'mol qilish uchun javob beradi. Ma'lumotlar yo'qotilishining oldini olish uchun Kafka xabarlari diskda saqlanadi va klaster ichida takrorlanadi. Kafka tizimi ZooKeeper sinxronizatsiya xizmati ustiga qurilgan;
Apache Spark Streaming - Oqimli ma'lumotlarni qayta ishlash uchun Spark komponenti. Spark Streaming moduli mikro-to'plamli arxitekturadan foydalangan holda qurilgan, bu erda ma'lumotlar oqimi kichik ma'lumotlar paketlarining uzluksiz ketma-ketligi sifatida talqin etiladi. Spark Streaming turli manbalardan ma'lumotlarni oladi va ularni kichik paketlarga birlashtiradi. Yangi paketlar muntazam ravishda yaratiladi. Har bir vaqt oralig'ining boshida yangi paket yaratiladi va shu vaqt oralig'ida olingan har qanday ma'lumotlar paketga kiritiladi. Intervalning oxirida paketlar o'sishi to'xtaydi. Intervalning o'lchami partiya oralig'i deb ataladigan parametr bilan aniqlanadi;
Apache Spark SQL - relyatsion ishlov berishni Spark funktsional dasturlash bilan birlashtiradi. Strukturaviy ma'lumotlar deganda sxemaga ega bo'lgan ma'lumotlar, ya'ni barcha yozuvlar uchun yagona maydonlar to'plami tushuniladi. Spark SQL turli tuzilgan ma'lumotlar manbalaridan kiritishni qo'llab-quvvatlaydi va sxema ma'lumotlarining mavjudligi tufayli u faqat kerakli yozuv maydonlarini samarali ravishda olishi mumkin, shuningdek DataFrame API'larini taqdim etadi;
AWS RDS nisbatan arzon bulutga asoslangan relyatsion ma'lumotlar bazasi, sozlash, ishlatish va masshtablashni soddalashtiradigan va to'g'ridan-to'g'ri Amazon tomonidan boshqariladigan veb-xizmat.
Kafka serverini o'rnatish va ishga tushirish
Kafkani to'g'ridan-to'g'ri ishlatishdan oldin sizda Java borligiga ishonch hosil qilishingiz kerak, chunki... JVM ish uchun ishlatiladi:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Keyingi qadam ixtiyoriy. Gap shundaki, standart sozlamalar Apache Kafkaning barcha imkoniyatlaridan to‘liq foydalanishga imkon bermaydi. Masalan, xabarlar chop etilishi mumkin bo'lgan mavzuni, toifani, guruhni o'chiring. Buni o'zgartirish uchun konfiguratsiya faylini tahrirlaymiz:
vim ~/kafka/config/server.properties
Fayl oxiriga quyidagilarni qo'shing:
delete.topic.enable = true
Kafka serverini ishga tushirishdan oldin ZooKeeper serverini ishga tushirishingiz kerak, biz Kafka tarqatilishi bilan birga keladigan yordamchi skriptdan foydalanamiz:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
ZooKeeper muvaffaqiyatli ishga tushirilgandan so'ng, Kafka serverini alohida terminalda ishga tushiring:
Keling, yangi yaratilgan mavzu uchun ishlab chiqaruvchi va iste'molchini sinab ko'rish daqiqalarini o'tkazib yuboraylik. Xabarlarni yuborish va qabul qilishni qanday sinab ko'rishingiz mumkinligi haqida batafsil ma'lumot rasmiy hujjatlarda yozilgan - Ba'zi xabarlarni yuboring. Xo'sh, biz KafkaProducer API yordamida Python-da prodyuser yozishga o'tamiz.
Ishlab chiqaruvchi yozish
Ishlab chiqaruvchi tasodifiy ma'lumotlarni yaratadi - har soniyada 100 ta xabar. Tasodifiy ma'lumotlar deganda biz uchta maydondan iborat lug'atni tushunamiz:
filial - kredit tashkilotining savdo nuqtasining nomi;
Valyutalar — muomala valyutasi;
miqdor - tranzaksiya summasi. Agar bu bank tomonidan valyuta sotib olingan bo'lsa, bu miqdor ijobiy raqam bo'ladi va agar u savdo bo'lsa, salbiy raqam bo'ladi.
Ishlab chiqaruvchining kodi quyidagicha ko'rinadi:
Keyinchalik, jo'natish usulidan foydalanib, biz serverga kerakli mavzuga JSON formatida xabar yuboramiz:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Skriptni ishga tushirishda biz terminalda quyidagi xabarlarni olamiz:
Bu shuni anglatadiki, hamma narsa biz xohlagandek ishlaydi - ishlab chiqaruvchi bizga kerak bo'lgan mavzu bo'yicha xabarlarni yaratadi va yuboradi.
Keyingi qadam Spark-ni o'rnatish va ushbu xabar oqimini qayta ishlashdir.
Apache Spark o'rnatilmoqda
Apache Spark universal va yuqori unumli klasterli hisoblash platformasidir.
Spark MapReduce modelining mashhur ilovalariga qaraganda yaxshiroq ishlaydi, shu bilan birga kengroq hisoblash turlarini, jumladan, interaktiv so‘rovlar va oqimlarni qayta ishlashni qo‘llab-quvvatlaydi. Tezlik katta hajmdagi ma'lumotlarni qayta ishlashda muhim rol o'ynaydi, chunki bu tezlik sizga daqiqalar yoki soatlar kutmasdan interaktiv ishlash imkonini beradi. Spark-ning eng katta kuchli tomonlaridan biri uni juda tez qiladi - uning xotirada hisob-kitoblarni amalga oshirish qobiliyati.
Ushbu ramka Scala-da yozilgan, shuning uchun avval uni o'rnatishingiz kerak:
sudo apt-get install scala
Rasmiy veb-saytdan Spark distributivini yuklab oling:
bashrc-ga o'zgartirish kiritgandan so'ng quyidagi buyruqni bajaring:
source ~/.bashrc
AWS PostgreSQL-ni o'rnatish
Biz oqimlardan qayta ishlangan ma'lumotlarni yuklaydigan ma'lumotlar bazasini joylashtirish qoladi. Buning uchun biz AWS RDS xizmatidan foydalanamiz.
AWS konsoliga o'ting -> AWS RDS -> Ma'lumotlar bazalari -> Ma'lumotlar bazasini yaratish:
PostgreSQL-ni tanlang va Keyingiga bosing:
Chunki Ushbu misol faqat ta'lim maqsadlarida; biz "minimal" bepul serverdan foydalanamiz (Bepul daraja):
Keyinchalik, biz Free Tier blokiga belgi qo'yamiz va shundan so'ng bizga avtomatik ravishda t2.micro sinfining namunasi taklif qilinadi - zaif bo'lsa ham, u bepul va bizning vazifamiz uchun juda mos keladi:
Keyinchalik juda muhim narsalar keladi: ma'lumotlar bazasi namunasining nomi, asosiy foydalanuvchi nomi va uning paroli. Keling, misolni nomlaylik: myHabrTest, asosiy foydalanuvchi: habr, parol: habr12345 va Keyingi tugmasini bosing:
Keyingi sahifada ma'lumotlar bazasi serverimizga tashqaridan kirish imkoniyati (Ommaviy foydalanish imkoniyati) va port mavjudligi uchun mas'ul bo'lgan parametrlar mavjud:
Keling, VPC xavfsizlik guruhi uchun yangi sozlamani yarataylik, bu bizning ma'lumotlar bazasi serverimizga 5432 (PostgreSQL) porti orqali tashqi kirish imkonini beradi.
Keling, AWS konsoliga alohida brauzer oynasida VPC boshqaruv paneli -> Xavfsizlik guruhlari -> Xavfsizlik guruhini yaratish bo'limiga o'tamiz:
Biz Xavfsizlik guruhi nomini o'rnatdik - PostgreSQL, tavsif, ushbu guruh qaysi VPC bilan bog'lanishi kerakligini ko'rsating va Yaratish tugmasini bosing:
Quyidagi rasmda ko'rsatilganidek, yangi yaratilgan guruh uchun 5432-port uchun kirish qoidalarini to'ldiring. Siz portni qo'lda ko'rsata olmaysiz, lekin Turi ochiladigan ro'yxatidan PostgreSQL-ni tanlang.
To'g'ri aytganda, ::/0 qiymati butun dunyo bo'ylab serverga kiruvchi trafik mavjudligini anglatadi, bu kanonik jihatdan mutlaqo to'g'ri emas, ammo misolni tahlil qilish uchun o'zimizga ushbu yondashuvdan foydalanishga ruxsat beraylik:
Biz brauzer sahifasiga qaytamiz, u erda bizda "Kengaytirilgan sozlamalarni sozlash" ochiladi va VPC xavfsizlik guruhlari bo'limida tanlang -> Mavjud VPC xavfsizlik guruhlarini tanlang -> PostgreSQL:
Keyin, Ma'lumotlar bazasi parametrlarida -> Ma'lumotlar bazasi nomi -> nomni o'rnating - habrDB.
Qolgan parametrlarni qoldirishimiz mumkin, sukut boʻyicha zaxira nusxasini oʻchirish (zaxirani saqlash muddati – 0 kun), monitoring va Performance Insights bundan mustasno. Tugmani bosing Ma'lumotlar bazasini yaratish:
Ip ishlov beruvchisi
Yakuniy bosqich har ikki soniyada Kafkadan keladigan yangi ma'lumotlarni qayta ishlovchi va natijani ma'lumotlar bazasiga kiritadigan Spark ishini ishlab chiqish bo'ladi.
Yuqorida ta'kidlab o'tilganidek, nazorat nuqtalari SparkStreaming-ning asosiy mexanizmi bo'lib, ular nosozliklarga chidamliligini ta'minlash uchun sozlanishi kerak. Biz nazorat punktlaridan foydalanamiz va agar protsedura muvaffaqiyatsiz bo'lsa, Spark Streaming moduli yo'qolgan ma'lumotlarni qayta tiklash uchun faqat oxirgi nazorat punktiga qaytishi va undan hisob-kitoblarni davom ettirishi kerak bo'ladi.
Tekshirish punkti ma'lumotlari saqlanadigan xatoga chidamli, ishonchli fayl tizimida (masalan, HDFS, S3 va boshqalar) katalogni o'rnatish orqali tekshirishni yoqish mumkin. Bu, masalan, yordamida amalga oshiriladi:
streamingContext.checkpoint(checkpointDirectory)
Bizning misolimizda biz quyidagi yondashuvdan foydalanamiz, ya'ni agar checkpointDirectory mavjud bo'lsa, kontekst tekshiruv punkti ma'lumotlaridan qayta yaratiladi. Agar katalog mavjud bo'lmasa (ya'ni birinchi marta bajarilgan bo'lsa), yangi kontekst yaratish va DStreams-ni sozlash uchun functionToCreateContext chaqiriladi:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
KafkaUtils kutubxonasining createDirectStream usuli yordamida “tranzaksiya” mavzusiga ulanish uchun DirectStream obyektini yaratamiz:
Spark SQL-dan foydalanib, biz oddiy guruhlashni amalga oshiramiz va natijani konsolda ko'rsatamiz:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
So'rov matnini olish va uni Spark SQL orqali ishga tushirish:
Va keyin biz olingan jamlangan ma'lumotlarni AWS RDS jadvaliga saqlaymiz. Birlashtirish natijalarini ma'lumotlar bazasi jadvaliga saqlash uchun DataFrame obyektining yozish usulidan foydalanamiz:
AWS RDS ga ulanishni o'rnatish haqida bir necha so'z. Biz uning uchun foydalanuvchi va parolni “AWS PostgreSQL-ni joylashtirish” bosqichida yaratdik. Ulanish va xavfsizlik bo'limida ko'rsatilgan ma'lumotlar bazasi serverining URL manzili sifatida siz Endpoint dan foydalanishingiz kerak:
Spark va Kafkani to'g'ri ulash uchun siz artefakt yordamida smark-submit orqali ishni bajarishingiz kerak. spark-streaming-kafka-0-8_2.11. Bundan tashqari, biz PostgreSQL ma'lumotlar bazasi bilan o'zaro ishlash uchun artefaktdan ham foydalanamiz; biz ularni --packages orqali uzatamiz.
Skriptning moslashuvchanligi uchun biz kirish parametrlari sifatida xabarlar serverining nomini va biz ma'lumot olishni istagan mavzuni ham kiritamiz.
Shunday qilib, tizimni ishga tushirish va uning ishlashini tekshirish vaqti keldi:
Hammasi chiqdi! Quyidagi rasmda ko'rib turganingizdek, dastur ishlayotgan vaqtda har 2 soniyada yangi yig'ish natijalari chiqariladi, chunki biz StreamingContext obyektini yaratganimizda paketlash oralig'ini 2 soniyaga o'rnatdik:
Keyinchalik, jadvaldagi yozuvlar mavjudligini tekshirish uchun ma'lumotlar bazasiga oddiy so'rov o'tkazamiz tranzaksiya_oqimi:
xulosa
Ushbu maqola Apache Kafka va PostgreSQL bilan birgalikda Spark Streaming yordamida ma'lumotlarni oqim bilan qayta ishlash misolini ko'rib chiqdi. Turli manbalardan olingan ma'lumotlarning o'sishi bilan oqim va real vaqt rejimida ilovalarni yaratish uchun Spark Streamingning amaliy qiymatini ortiqcha baholash qiyin.
To'liq manba kodini mening omborimda topishingiz mumkin GitHub.
Men ushbu maqolani muhokama qilishdan xursandman, sharhlaringizni kutaman, shuningdek, barcha g'amxo'r o'quvchilarning konstruktiv tanqidiga umid qilaman.
Omad tilayman!
. Dastlab mahalliy PostgreSQL ma'lumotlar bazasidan foydalanish rejalashtirilgan edi, lekin AWSga bo'lgan muhabbatimni hisobga olib, ma'lumotlar bazasini bulutga ko'chirishga qaror qildim. Ushbu mavzu bo'yicha keyingi maqolada men AWS Kinesis va AWS EMR yordamida yuqorida tavsiflangan butun tizimni AWS da qanday amalga oshirishni ko'rsataman. Yangiliklarni kuzatib boring!