ProHoster > Blog > İdarə > Apache Kafka və Spark Streaming ilə Data Streaming
Apache Kafka və Spark Streaming ilə Data Streaming
Hey Habr! Bu gün biz Spark Streaming istifadə edərək Apache Kafka mesaj axınlarını emal edəcək və emal nəticəsini AWS RDS bulud verilənlər bazasına yazacaq bir sistem quracağıq.
Təsəvvür edin ki, müəyyən bir kredit təşkilatı bizə bütün filialları üçün daxil olan əməliyyatları "tezliklə" emal etmək vəzifəsini qoyur. Bu, xəzinədarlıq üçün açıq valyuta mövqeyini, əməliyyatlar üzrə limitləri və ya maliyyə nəticəsini və s. tez hesablanması məqsədilə edilə bilər.
Sehrli və sehrli sehrlərdən istifadə etmədən bu işi necə həyata keçirmək olar - kəsik altında oxuyun! Get!
Təbii ki, böyük həcmdə verilənlərin real vaxt rejimində işlənməsi müasir sistemlərdə istifadə üçün geniş imkanlar yaradır. Bunun üçün ən məşhur kombinasiyalardan biri Apache Kafka və Spark Streaming tandemidir, burada Kafka gələn mesaj paketləri axını yaradır və Spark Streaming bu paketləri müəyyən vaxt intervalında emal edir.
Tətbiqin nasazlıqlara qarşı dözümlülüyünü yaxşılaşdırmaq üçün yoxlama nöqtələrindən - yoxlama məntəqələrindən istifadə edəcəyik. Bu mexanizmlə, Spark Streaming modulunun itirilmiş məlumatları bərpa etməsi lazım olduqda, yalnız sonuncu yoxlama nöqtəsinə qayıtmaq və oradan hesablamaları davam etdirmək lazımdır.
İnkişaf etmiş sistemin arxitekturası
İstifadə olunan komponentlər:
Apaçi Kafka paylanmış dərc və abunə mesajlaşma sistemidir. Həm oflayn, həm də onlayn mesaj istehlakı üçün uyğundur. Məlumat itkisinin qarşısını almaq üçün Kafka mesajları diskdə saxlanılır və klaster daxilində təkrarlanır. Kafka sistemi ZooKeeper sinxronizasiya xidmətinin üzərində qurulub;
Apache Spark Streaming - axın məlumatlarının işlənməsi üçün Spark komponenti. Spark Streaming modulu məlumat axını kiçik məlumat paketlərinin davamlı ardıcıllığı kimi şərh edildikdə mikro toplu arxitekturasından istifadə etməklə qurulur. Spark Streaming müxtəlif mənbələrdən məlumatları götürür və kiçik partiyalarda birləşdirir. Mütəmadi olaraq yeni paketlər yaradılır. Hər vaxt intervalının başlanğıcında yeni paket yaradılır və bu intervalda alınan hər hansı məlumat paketə daxil edilir. Aralığın sonunda paket artımı dayanır. İntervalın ölçüsü partiya intervalı adlanan parametrlə müəyyən edilir;
Apache Spark SQL - Əlaqəli emal ilə Spark funksional proqramlaşdırmanı birləşdirir. Strukturlaşdırılmış verilənlər dedikdə sxemi olan verilənlərə, yəni bütün qeydlər üçün vahid sahələr dəstinə aiddir. Spark SQL müxtəlif strukturlaşdırılmış məlumat mənbələrindən daxiletmələri dəstəkləyir və sxem məlumatlarının mövcudluğuna görə o, yalnız tələb olunan qeyd sahələrini səmərəli şəkildə əldə edə bilir, həmçinin DataFrame API-lərini təmin edir;
AWS RDS nisbətən ucuz bulud əsaslı relational verilənlər bazası, birbaşa Amazon tərəfindən idarə olunan quraşdırma, əməliyyat və miqyaslaşdırmanı asanlaşdıran veb xidmətidir.
Kafka serverinin quraşdırılması və işə salınması
Kafkadan birbaşa istifadə etməzdən əvvəl Java-nın olduğundan əmin olmalısınız, çünki JVM iş üçün istifadə olunur:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Növbəti addım isteğe bağlıdır. Fakt budur ki, standart parametrlər Apache Kafka-nın bütün xüsusiyyətlərindən tam istifadə etməyə imkan vermir. Məsələn, mesajların dərc oluna biləcəyi mövzunu, kateqoriyanı, qrupu silin. Bunu dəyişmək üçün konfiqurasiya faylını redaktə edək:
vim ~/kafka/config/server.properties
Faylın sonuna aşağıdakıları əlavə edin:
delete.topic.enable = true
Kafka serverinə başlamazdan əvvəl ZooKeeper serverini işə salmalısınız, biz Kafka paylanması ilə gələn köməkçi skriptdən istifadə edəcəyik:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
ZooKeeper uğurla işə salındıqdan sonra Kafka serverini ayrıca terminalda işə salırıq:
Yeni yaradılan mövzu üçün istehsalçı və istehlakçının sınaqdan keçirildiyi anları əldən verək. Mesajların göndərilməsini və qəbulunu necə sınaqdan keçirə biləcəyiniz haqqında ətraflı məlumat rəsmi sənədlərdə yazılmışdır - Bəzi mesajlar göndərin. Yaxşı, biz KafkaProducer API istifadə edərək Python-da prodüser yazmağa davam edirik.
İstehsalçı yazısı
İstehsalçı təsadüfi məlumat yaradacaq - hər saniyədə 100 mesaj. Təsadüfi məlumatlar dedikdə üç sahədən ibarət lüğət nəzərdə tutulur:
Filial — kredit təşkilatının satış məntəqəsinin adı;
Valyuta — əməliyyat valyutası;
Məbləği - əməliyyat məbləği. Məbləğ Bank tərəfindən valyuta alışı olduqda müsbət, satış olduqda isə mənfi olacaq.
Sonra, göndərmə metodundan istifadə edərək, serverə, lazım olan mövzuya JSON formatında bir mesaj göndəririk:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Skripti işləyərkən terminalda aşağıdakı mesajları alırıq:
Bu o deməkdir ki, hər şey istədiyimiz kimi işləyir - prodüser bizə lazım olan mövzuya mesajlar yaradır və göndərir.
Növbəti addım Spark-ı quraşdırmaq və bu mesaj axınını emal etməkdir.
Apache Spark quraşdırılması
Apache Spark çox yönlü və yüksək performanslı klaster hesablama platformasıdır.
Spark, interaktiv sorğular və axın daxil olmaqla, daha geniş spektrli hesablama növləri üçün dəstək təmin etməklə yanaşı, performans baxımından MapReduce modelinin populyar tətbiqlərini üstələyir. Sürət böyük miqdarda məlumatların işlənməsi zamanı mühüm rol oynayır, çünki dəqiqələr və saatlar gözləmədən interaktiv işləməyə imkan verən sürətdir. Spark-ın bu sürəti çatdırmaq üçün ən böyük üstünlüklərindən biri yaddaşdaxili hesablamaları yerinə yetirmək qabiliyyətidir.
Bu çərçivə Scala ilə yazılmışdır, ona görə də əvvəlcə onu quraşdırmalısınız:
sudo apt-get install scala
Spark paylamasını rəsmi internet saytından yükləyin:
Çünki bu nümunə yalnız təhsil məqsədləri üçün təhlil edilir, biz "minimum" (Pulsuz Tier) pulsuz serverdən istifadə edəcəyik:
Sonra, Pulsuz Tier blokunu yoxlayın və bundan sonra bizə avtomatik olaraq t2.micro sinifinin nümunəsi təklif ediləcək - zəif olsa da, pulsuz və tapşırığımız üçün olduqca uyğundur:
Çox vacib şeylər izlənilir: DB instansiyasının adı, əsas istifadəçinin adı və onun parolu. Məsələni çağıraq: myHabrTest, master user: habr, parol: habr12345 və Next düyməsini basın:
Növbəti səhifədə verilənlər bazası serverimizin xaricdən əlçatanlığı (İctimai əlçatanlıq) və portların mövcudluğu üçün cavabdeh olan parametrlər var:
Gəlin VPC təhlükəsizlik qrupu üçün 5432 (PostgreSQL) portu vasitəsilə verilənlər bazası serverimizə xaricə daxil olmağa imkan verəcək yeni parametr yaradaq.
Gəlin ayrı brauzer pəncərəsində VPC İdarə Panelində AWS konsoluna gedək -> Təhlükəsizlik Qrupları -> Təhlükəsizlik qrupu yarat bölməsi:
Təhlükəsizlik qrupunun adını təyin etdik - PostgreSQL, təsvir, bu qrupun hansı VPC ilə əlaqələndirilməsini təyin edin və Yarat düyməsini basın:
Aşağıdakı şəkildə göstərildiyi kimi, 5432 portu üçün yeni yaradılmış qrup Gələn qaydaları doldururuq. Siz portu əl ilə təyin edə bilməzsiniz, lakin Tip açılan siyahısından PostgreSQL seçin.
Düzünü desək, ::/0 dəyəri serverə dünyanın hər yerindən daxil olan trafikin mövcudluğu deməkdir, bu, kanonik olaraq tamamilə doğru deyil, lakin nümunəni təhlil etmək üçün bu yanaşmadan istifadə edək:
Brauzer səhifəsinə qayıdırıq, burada "Qabaqcıl parametrləri konfiqurasiya et" bölməsini açırıq və bölmədə VPC təhlükəsizlik qruplarını seçin -> Mövcud VPC təhlükəsizlik qruplarını seçin -> PostgreSQL:
Sonra bölmədə Verilənlər bazası seçimləri -> Verilənlər bazasının adı -> adı təyin edin - habrDB.
Defolt olaraq ehtiyat nüsxəni (ehtiyatın saxlanma müddəti - 0 gün), monitorinq və Performans İnsightlarını söndürmək istisna olmaqla, qalan parametrləri tərk edə bilərik. Düyməni basın Verilənlər bazası yaradın:
Yayım İşləyicisi
Son mərhələ Kafkadan hər iki saniyədən bir yeni məlumatları emal edəcək və nəticəni verilənlər bazasına daxil edəcək Spark işinin hazırlanması olacaq.
Yuxarıda qeyd edildiyi kimi, yoxlama məntəqələri SparkStreaming-də səhvlərə dözümlülüyü təmin etmək üçün konfiqurasiya edilməli olan əsas mexanizmdir. Biz yoxlama nöqtələrindən istifadə edəcəyik və prosedur uğursuz olarsa, Spark Streaming modulu itirilmiş məlumatları bərpa etmək üçün yalnız sonuncu yoxlama məntəqəsinə qayıtmalı və ondan hesablamaları davam etdirməlidir.
Yoxlama məntəqəsi məlumatın saxlanacağı xətaya dözümlü, etibarlı fayl sistemində (məsələn, HDFS, S3 və s.) kataloq qurmaqla aktivləşdirilə bilər. Bu, məsələn:
streamingContext.checkpoint(checkpointDirectory)
Nümunəmizdə aşağıdakı yanaşmadan istifadə edəcəyik, yəni checkpointDirectory varsa, kontekst yoxlama nöqtəsi məlumatlarından yenidən yaradılacaq. Əgər kataloq mövcud deyilsə (yəni ilk dəfə icra olunur), onda yeni kontekst yaratmaq və DStreams qurmaq üçün functionToCreateContext funksiyası çağırılır:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
KafkaUtils kitabxanasının createDirectStream metodundan istifadə edərək "əməliyyat" mövzusuna qoşulmaq üçün DirectStream obyekti yaradırıq:
Spark SQL istifadə edərək, biz sadə qruplaşdırma aparırıq və nəticəni konsola çıxarırıq:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Sorğunun gövdəsini əldə etmək və onu Spark SQL vasitəsilə idarə etmək:
Və sonra biz alınan məcmu məlumatları AWS RDS-də cədvəldə saxlayırıq. Ümumiləşdirmənin nəticələrini verilənlər bazası cədvəlində saxlamaq üçün DataFrame obyektinin yazma metodundan istifadə edəcəyik:
AWS RDS ilə əlaqə qurmaq haqqında bir neçə söz. Bunun üçün istifadəçi və parolu “AWS PostgreSQL-in yerləşdirilməsi” addımında yaratdıq. Verilənlər bazası serverinin url-i olaraq, Bağlantı və təhlükəsizlik bölməsində göstərilən Son nöqtədən istifadə etməlisiniz:
Spark və Kafkanı düzgün birləşdirmək üçün işi artefaktdan istifadə edərək smark-submit vasitəsilə yerinə yetirməlisiniz. spark-streaming-kafka-0-8_2.11. Bundan əlavə, biz PostgreSQL verilənlər bazası ilə qarşılıqlı əlaqə üçün artefaktdan da istifadə edəcəyik, onları --paketlər vasitəsilə keçirəcəyik.
Skriptin çevikliyi üçün, həmçinin giriş parametrləri kimi mesaj serverinin adını və məlumat almaq istədiyimiz mövzunu çıxaracağıq.
Beləliklə, sistemi işə salmağın və sınaqdan keçirməyin vaxtı gəldi:
Hər şey alındı! Aşağıdakı şəkildə gördüyünüz kimi, proqram işləyərkən hər 2 saniyədən bir yeni toplama nəticələri göstərilir, çünki biz StreamingContext obyektini yaradanda paketləmə intervalını 2 saniyəyə təyin etmişik:
Sonra, cədvəldəki qeydləri yoxlamaq üçün verilənlər bazasına sadə bir sorğu edirik əməliyyat_axı:
Nəticə
Bu məqalədə Apache Kafka və PostgreSQL ilə birlikdə Spark Streaming-dən istifadə edərək axın məlumatlarının işlənməsi nümunəsi nəzərdən keçirilmişdir. Müxtəlif mənbələrdən alınan məlumatların həcminin artması ilə real vaxt rejimində və axın proqramları yaratmaq üçün Spark Streaming-in praktik dəyərini qiymətləndirmək çətindir.
Tam mənbə kodunu mənim depomda tapa bilərsiniz Github.
Bu məqaləni müzakirə etməkdən məmnunam, şərhlərinizi səbirsizliklə gözləyirəm, həmçinin bütün maraqlanan oxuculardan konstruktiv tənqidə ümid edirəm.
Uğurlar diləyirəm!
Məz. Əvvəlcə yerli PostgreSQL verilənlər bazasından istifadə etmək planlaşdırılırdı, lakin AWS-ə olan sevgimi nəzərə alaraq verilənlər bazasını buludlara köçürmək qərarına gəldim. Bu mövzu ilə bağlı növbəti məqalədə AWS Kinesis və AWS EMR-dən istifadə edərək yuxarıda təsvir edilən bütün sistemi AWS-də necə tətbiq edəcəyinizi sizə göstərəcəyəm. Xəbərləri izləyin!