Apache Kafka va Spark Streaming bilan oqimli ma'lumotlarni qayta ishlash

Salom, Xabr! Bugun biz Spark Streaming yordamida Apache Kafka xabar oqimlarini qayta ishlaydigan tizimni quramiz va ishlov berish natijalarini AWS RDS bulutli ma'lumotlar bazasiga yozamiz.

Tasavvur qilaylik, ma'lum bir kredit tashkiloti bizga o'zining barcha filiallari bo'ylab kiruvchi tranzaktsiyalarni "ucharda" qayta ishlash vazifasini qo'yadi. Bu xazina uchun ochiq valyuta pozitsiyasini, operatsiyalar bo'yicha limitlarni yoki moliyaviy natijalarni va hokazolarni tezkor hisoblash maqsadida amalga oshirilishi mumkin.

Bu ishni sehrli va sehrli afsunlardan foydalanmasdan qanday amalga oshirish kerak - kesish ostida o'qing! Bor!

Apache Kafka va Spark Streaming bilan oqimli ma'lumotlarni qayta ishlash
(Rasm manbai)

kirish

Albatta, real vaqt rejimida katta hajmdagi ma’lumotlarni qayta ishlash zamonaviy tizimlarda foydalanish uchun keng imkoniyatlar yaratadi. Buning uchun eng mashhur kombinatsiyalardan biri Apache Kafka va Spark Streaming tandemidir, bu yerda Kafka kiruvchi xabarlar paketlari oqimini yaratadi va Spark Streaming bu paketlarni ma'lum vaqt oralig'ida qayta ishlaydi.

Ilovaning xatolarga chidamliligini oshirish uchun biz nazorat nuqtalaridan foydalanamiz. Ushbu mexanizm yordamida Spark Streaming mexanizmi yo'qolgan ma'lumotlarni qayta tiklashi kerak bo'lganda, u faqat oxirgi nazorat nuqtasiga qaytib, u erdan hisob-kitoblarni davom ettirishi kerak.

Rivojlangan tizimning arxitekturasi

Apache Kafka va Spark Streaming bilan oqimli ma'lumotlarni qayta ishlash

Ishlatilgan komponentlar:

  • Apache Kafka tarqatilgan nashr-obuna xabarlar tizimidir. Ham oflayn, ham onlayn xabarlarni iste'mol qilish uchun javob beradi. Ma'lumotlar yo'qotilishining oldini olish uchun Kafka xabarlari diskda saqlanadi va klaster ichida takrorlanadi. Kafka tizimi ZooKeeper sinxronizatsiya xizmati ustiga qurilgan;
  • Apache Spark Streaming - Oqimli ma'lumotlarni qayta ishlash uchun Spark komponenti. Spark Streaming moduli mikro-to'plamli arxitekturadan foydalangan holda qurilgan, bu erda ma'lumotlar oqimi kichik ma'lumotlar paketlarining uzluksiz ketma-ketligi sifatida talqin etiladi. Spark Streaming turli manbalardan ma'lumotlarni oladi va ularni kichik paketlarga birlashtiradi. Yangi paketlar muntazam ravishda yaratiladi. Har bir vaqt oralig'ining boshida yangi paket yaratiladi va shu vaqt oralig'ida olingan har qanday ma'lumotlar paketga kiritiladi. Intervalning oxirida paketlar o'sishi to'xtaydi. Intervalning o'lchami partiya oralig'i deb ataladigan parametr bilan aniqlanadi;
  • Apache Spark SQL - relyatsion ishlov berishni Spark funktsional dasturlash bilan birlashtiradi. Strukturaviy ma'lumotlar deganda sxemaga ega bo'lgan ma'lumotlar, ya'ni barcha yozuvlar uchun yagona maydonlar to'plami tushuniladi. Spark SQL turli tuzilgan ma'lumotlar manbalaridan kiritishni qo'llab-quvvatlaydi va sxema ma'lumotlarining mavjudligi tufayli u faqat kerakli yozuv maydonlarini samarali ravishda olishi mumkin, shuningdek DataFrame API'larini taqdim etadi;
  • AWS RDS nisbatan arzon bulutga asoslangan relyatsion ma'lumotlar bazasi, sozlash, ishlatish va masshtablashni soddalashtiradigan va to'g'ridan-to'g'ri Amazon tomonidan boshqariladigan veb-xizmat.

Kafka serverini o'rnatish va ishga tushirish

Kafkani to'g'ridan-to'g'ri ishlatishdan oldin sizda Java borligiga ishonch hosil qilishingiz kerak, chunki... JVM ish uchun ishlatiladi:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Keling, Kafka bilan ishlash uchun yangi foydalanuvchi yarataylik:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Keyin, tarqatishni rasmiy Apache Kafka veb-saytidan yuklab oling:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Yuklab olingan arxivni oching:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Keyingi qadam ixtiyoriy. Gap shundaki, standart sozlamalar Apache Kafkaning barcha imkoniyatlaridan to‘liq foydalanishga imkon bermaydi. Masalan, xabarlar chop etilishi mumkin bo'lgan mavzuni, toifani, guruhni o'chiring. Buni o'zgartirish uchun konfiguratsiya faylini tahrirlaymiz:

vim ~/kafka/config/server.properties

Fayl oxiriga quyidagilarni qo'shing:

delete.topic.enable = true

Kafka serverini ishga tushirishdan oldin ZooKeeper serverini ishga tushirishingiz kerak, biz Kafka tarqatilishi bilan birga keladigan yordamchi skriptdan foydalanamiz:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

ZooKeeper muvaffaqiyatli ishga tushirilgandan so'ng, Kafka serverini alohida terminalda ishga tushiring:

bin/kafka-server-start.sh config/server.properties

Tranzaksiya deb nomlangan yangi mavzu yaratamiz:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Kerakli bo'limlar va replikatsiyalar soniga ega mavzu yaratilganligiga ishonch hosil qilaylik:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka va Spark Streaming bilan oqimli ma'lumotlarni qayta ishlash

Keling, yangi yaratilgan mavzu uchun ishlab chiqaruvchi va iste'molchini sinab ko'rish daqiqalarini o'tkazib yuboraylik. Xabarlarni yuborish va qabul qilishni qanday sinab ko'rishingiz mumkinligi haqida batafsil ma'lumot rasmiy hujjatlarda yozilgan - Ba'zi xabarlarni yuboring. Xo'sh, biz KafkaProducer API yordamida Python-da prodyuser yozishga o'tamiz.

Ishlab chiqaruvchi yozish

Ishlab chiqaruvchi tasodifiy ma'lumotlarni yaratadi - har soniyada 100 ta xabar. Tasodifiy ma'lumotlar deganda biz uchta maydondan iborat lug'atni tushunamiz:

  • filial - kredit tashkilotining savdo nuqtasining nomi;
  • Valyutalar — muomala valyutasi;
  • miqdor - tranzaksiya summasi. Agar bu bank tomonidan valyuta sotib olingan bo'lsa, bu miqdor ijobiy raqam bo'ladi va agar u savdo bo'lsa, salbiy raqam bo'ladi.

Ishlab chiqaruvchining kodi quyidagicha ko'rinadi:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Keyinchalik, jo'natish usulidan foydalanib, biz serverga kerakli mavzuga JSON formatida xabar yuboramiz:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Skriptni ishga tushirishda biz terminalda quyidagi xabarlarni olamiz:

Apache Kafka va Spark Streaming bilan oqimli ma'lumotlarni qayta ishlash

Bu shuni anglatadiki, hamma narsa biz xohlagandek ishlaydi - ishlab chiqaruvchi bizga kerak bo'lgan mavzu bo'yicha xabarlarni yaratadi va yuboradi.
Keyingi qadam Spark-ni o'rnatish va ushbu xabar oqimini qayta ishlashdir.

Apache Spark o'rnatilmoqda

Apache Spark universal va yuqori unumli klasterli hisoblash platformasidir.

Spark MapReduce modelining mashhur ilovalariga qaraganda yaxshiroq ishlaydi, shu bilan birga kengroq hisoblash turlarini, jumladan, interaktiv so‘rovlar va oqimlarni qayta ishlashni qo‘llab-quvvatlaydi. Tezlik katta hajmdagi ma'lumotlarni qayta ishlashda muhim rol o'ynaydi, chunki bu tezlik sizga daqiqalar yoki soatlar kutmasdan interaktiv ishlash imkonini beradi. Spark-ning eng katta kuchli tomonlaridan biri uni juda tez qiladi - uning xotirada hisob-kitoblarni amalga oshirish qobiliyati.

Ushbu ramka Scala-da yozilgan, shuning uchun avval uni o'rnatishingiz kerak:

sudo apt-get install scala

Rasmiy veb-saytdan Spark distributivini yuklab oling:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Arxivni ochish:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Bash fayliga Spark yo'lini qo'shing:

vim ~/.bashrc

Tahrirlovchi orqali quyidagi qatorlarni qo'shing:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

bashrc-ga o'zgartirish kiritgandan so'ng quyidagi buyruqni bajaring:

source ~/.bashrc

AWS PostgreSQL-ni o'rnatish

Biz oqimlardan qayta ishlangan ma'lumotlarni yuklaydigan ma'lumotlar bazasini joylashtirish qoladi. Buning uchun biz AWS RDS xizmatidan foydalanamiz.

AWS konsoliga o'ting -> AWS RDS -> Ma'lumotlar bazalari -> Ma'lumotlar bazasini yaratish:
Apache Kafka va Spark Streaming bilan oqimli ma'lumotlarni qayta ishlash

PostgreSQL-ni tanlang va Keyingiga bosing:
Apache Kafka va Spark Streaming bilan oqimli ma'lumotlarni qayta ishlash

Chunki Ushbu misol faqat ta'lim maqsadlarida; biz "minimal" bepul serverdan foydalanamiz (Bepul daraja):
Apache Kafka va Spark Streaming bilan oqimli ma'lumotlarni qayta ishlash

Keyinchalik, biz Free Tier blokiga belgi qo'yamiz va shundan so'ng bizga avtomatik ravishda t2.micro sinfining namunasi taklif qilinadi - zaif bo'lsa ham, u bepul va bizning vazifamiz uchun juda mos keladi:
Apache Kafka va Spark Streaming bilan oqimli ma'lumotlarni qayta ishlash

Keyinchalik juda muhim narsalar keladi: ma'lumotlar bazasi namunasining nomi, asosiy foydalanuvchi nomi va uning paroli. Keling, misolni nomlaylik: myHabrTest, asosiy foydalanuvchi: habr, parol: habr12345 va Keyingi tugmasini bosing:
Apache Kafka va Spark Streaming bilan oqimli ma'lumotlarni qayta ishlash

Keyingi sahifada ma'lumotlar bazasi serverimizga tashqaridan kirish imkoniyati (Ommaviy foydalanish imkoniyati) va port mavjudligi uchun mas'ul bo'lgan parametrlar mavjud:

Apache Kafka va Spark Streaming bilan oqimli ma'lumotlarni qayta ishlash

Keling, VPC xavfsizlik guruhi uchun yangi sozlamani yarataylik, bu bizning ma'lumotlar bazasi serverimizga 5432 (PostgreSQL) porti orqali tashqi kirish imkonini beradi.
Keling, AWS konsoliga alohida brauzer oynasida VPC boshqaruv paneli -> Xavfsizlik guruhlari -> Xavfsizlik guruhini yaratish bo'limiga o'tamiz:
Apache Kafka va Spark Streaming bilan oqimli ma'lumotlarni qayta ishlash

Biz Xavfsizlik guruhi nomini o'rnatdik - PostgreSQL, tavsif, ushbu guruh qaysi VPC bilan bog'lanishi kerakligini ko'rsating va Yaratish tugmasini bosing:
Apache Kafka va Spark Streaming bilan oqimli ma'lumotlarni qayta ishlash

Quyidagi rasmda ko'rsatilganidek, yangi yaratilgan guruh uchun 5432-port uchun kirish qoidalarini to'ldiring. Siz portni qo'lda ko'rsata olmaysiz, lekin Turi ochiladigan ro'yxatidan PostgreSQL-ni tanlang.

To'g'ri aytganda, ::/0 qiymati butun dunyo bo'ylab serverga kiruvchi trafik mavjudligini anglatadi, bu kanonik jihatdan mutlaqo to'g'ri emas, ammo misolni tahlil qilish uchun o'zimizga ushbu yondashuvdan foydalanishga ruxsat beraylik:
Apache Kafka va Spark Streaming bilan oqimli ma'lumotlarni qayta ishlash

Biz brauzer sahifasiga qaytamiz, u erda bizda "Kengaytirilgan sozlamalarni sozlash" ochiladi va VPC xavfsizlik guruhlari bo'limida tanlang -> Mavjud VPC xavfsizlik guruhlarini tanlang -> PostgreSQL:
Apache Kafka va Spark Streaming bilan oqimli ma'lumotlarni qayta ishlash

Keyin, Ma'lumotlar bazasi parametrlarida -> Ma'lumotlar bazasi nomi -> nomni o'rnating - habrDB.

Qolgan parametrlarni qoldirishimiz mumkin, sukut boʻyicha zaxira nusxasini oʻchirish (zaxirani saqlash muddati – 0 kun), monitoring va Performance Insights bundan mustasno. Tugmani bosing Ma'lumotlar bazasini yaratish:
Apache Kafka va Spark Streaming bilan oqimli ma'lumotlarni qayta ishlash

Ip ishlov beruvchisi

Yakuniy bosqich har ikki soniyada Kafkadan keladigan yangi ma'lumotlarni qayta ishlovchi va natijani ma'lumotlar bazasiga kiritadigan Spark ishini ishlab chiqish bo'ladi.

Yuqorida ta'kidlab o'tilganidek, nazorat nuqtalari SparkStreaming-ning asosiy mexanizmi bo'lib, ular nosozliklarga chidamliligini ta'minlash uchun sozlanishi kerak. Biz nazorat punktlaridan foydalanamiz va agar protsedura muvaffaqiyatsiz bo'lsa, Spark Streaming moduli yo'qolgan ma'lumotlarni qayta tiklash uchun faqat oxirgi nazorat punktiga qaytishi va undan hisob-kitoblarni davom ettirishi kerak bo'ladi.

Tekshirish punkti ma'lumotlari saqlanadigan xatoga chidamli, ishonchli fayl tizimida (masalan, HDFS, S3 va boshqalar) katalogni o'rnatish orqali tekshirishni yoqish mumkin. Bu, masalan, yordamida amalga oshiriladi:

streamingContext.checkpoint(checkpointDirectory)

Bizning misolimizda biz quyidagi yondashuvdan foydalanamiz, ya'ni agar checkpointDirectory mavjud bo'lsa, kontekst tekshiruv punkti ma'lumotlaridan qayta yaratiladi. Agar katalog mavjud bo'lmasa (ya'ni birinchi marta bajarilgan bo'lsa), yangi kontekst yaratish va DStreams-ni sozlash uchun functionToCreateContext chaqiriladi:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

KafkaUtils kutubxonasining createDirectStream usuli yordamida “tranzaksiya” mavzusiga ulanish uchun DirectStream obyektini yaratamiz:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

JSON formatida kiruvchi ma'lumotlarni tahlil qilish:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Spark SQL-dan foydalanib, biz oddiy guruhlashni amalga oshiramiz va natijani konsolda ko'rsatamiz:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

So'rov matnini olish va uni Spark SQL orqali ishga tushirish:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Va keyin biz olingan jamlangan ma'lumotlarni AWS RDS jadvaliga saqlaymiz. Birlashtirish natijalarini ma'lumotlar bazasi jadvaliga saqlash uchun DataFrame obyektining yozish usulidan foydalanamiz:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

AWS RDS ga ulanishni o'rnatish haqida bir necha so'z. Biz uning uchun foydalanuvchi va parolni “AWS PostgreSQL-ni joylashtirish” bosqichida yaratdik. Ulanish va xavfsizlik bo'limida ko'rsatilgan ma'lumotlar bazasi serverining URL manzili sifatida siz Endpoint dan foydalanishingiz kerak:

Apache Kafka va Spark Streaming bilan oqimli ma'lumotlarni qayta ishlash

Spark va Kafkani to'g'ri ulash uchun siz artefakt yordamida smark-submit orqali ishni bajarishingiz kerak. spark-streaming-kafka-0-8_2.11. Bundan tashqari, biz PostgreSQL ma'lumotlar bazasi bilan o'zaro ishlash uchun artefaktdan ham foydalanamiz; biz ularni --packages orqali uzatamiz.

Skriptning moslashuvchanligi uchun biz kirish parametrlari sifatida xabarlar serverining nomini va biz ma'lumot olishni istagan mavzuni ham kiritamiz.

Shunday qilib, tizimni ishga tushirish va uning ishlashini tekshirish vaqti keldi:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Hammasi chiqdi! Quyidagi rasmda ko'rib turganingizdek, dastur ishlayotgan vaqtda har 2 soniyada yangi yig'ish natijalari chiqariladi, chunki biz StreamingContext obyektini yaratganimizda paketlash oralig'ini 2 soniyaga o'rnatdik:

Apache Kafka va Spark Streaming bilan oqimli ma'lumotlarni qayta ishlash

Keyinchalik, jadvaldagi yozuvlar mavjudligini tekshirish uchun ma'lumotlar bazasiga oddiy so'rov o'tkazamiz tranzaksiya_oqimi:

Apache Kafka va Spark Streaming bilan oqimli ma'lumotlarni qayta ishlash

xulosa

Ushbu maqola Apache Kafka va PostgreSQL bilan birgalikda Spark Streaming yordamida ma'lumotlarni oqim bilan qayta ishlash misolini ko'rib chiqdi. Turli manbalardan olingan ma'lumotlarning o'sishi bilan oqim va real vaqt rejimida ilovalarni yaratish uchun Spark Streamingning amaliy qiymatini ortiqcha baholash qiyin.

To'liq manba kodini mening omborimda topishingiz mumkin GitHub.

Men ushbu maqolani muhokama qilishdan xursandman, sharhlaringizni kutaman, shuningdek, barcha g'amxo'r o'quvchilarning konstruktiv tanqidiga umid qilaman.

Omad tilayman!

. Dastlab mahalliy PostgreSQL ma'lumotlar bazasidan foydalanish rejalashtirilgan edi, lekin AWSga bo'lgan muhabbatimni hisobga olib, ma'lumotlar bazasini bulutga ko'chirishga qaror qildim. Ushbu mavzu bo'yicha keyingi maqolada men AWS Kinesis va AWS EMR yordamida yuqorida tavsiflangan butun tizimni AWS da qanday amalga oshirishni ko'rsataman. Yangiliklarni kuzatib boring!

Manba: www.habr.com

a Izoh qo'shish