Apache Kafka və Spark Streaming ilə Data Streaming

Hey Habr! Bu gün biz Spark Streaming istifadə edərək Apache Kafka mesaj axınlarını emal edəcək və emal nəticəsini AWS RDS bulud verilənlər bazasına yazacaq bir sistem quracağıq.

Təsəvvür edin ki, müəyyən bir kredit təşkilatı bizə bütün filialları üçün daxil olan əməliyyatları "tezliklə" emal etmək vəzifəsini qoyur. Bu, xəzinədarlıq üçün açıq valyuta mövqeyini, əməliyyatlar üzrə limitləri və ya maliyyə nəticəsini və s. tez hesablanması məqsədilə edilə bilər.

Sehrli və sehrli sehrlərdən istifadə etmədən bu işi necə həyata keçirmək olar - kəsik altında oxuyun! Get!

Apache Kafka və Spark Streaming ilə Data Streaming
(Şəkil mənbəyi)

Giriş

Təbii ki, böyük həcmdə verilənlərin real vaxt rejimində işlənməsi müasir sistemlərdə istifadə üçün geniş imkanlar yaradır. Bunun üçün ən məşhur kombinasiyalardan biri Apache Kafka və Spark Streaming tandemidir, burada Kafka gələn mesaj paketləri axını yaradır və Spark Streaming bu paketləri müəyyən vaxt intervalında emal edir.

Tətbiqin nasazlıqlara qarşı dözümlülüyünü yaxşılaşdırmaq üçün yoxlama nöqtələrindən - yoxlama məntəqələrindən istifadə edəcəyik. Bu mexanizmlə, Spark Streaming modulunun itirilmiş məlumatları bərpa etməsi lazım olduqda, yalnız sonuncu yoxlama nöqtəsinə qayıtmaq və oradan hesablamaları davam etdirmək lazımdır.

İnkişaf etmiş sistemin arxitekturası

Apache Kafka və Spark Streaming ilə Data Streaming

İstifadə olunan komponentlər:

  • Apaçi Kafka paylanmış dərc və abunə mesajlaşma sistemidir. Həm oflayn, həm də onlayn mesaj istehlakı üçün uyğundur. Məlumat itkisinin qarşısını almaq üçün Kafka mesajları diskdə saxlanılır və klaster daxilində təkrarlanır. Kafka sistemi ZooKeeper sinxronizasiya xidmətinin üzərində qurulub;
  • Apache Spark Streaming - axın məlumatlarının işlənməsi üçün Spark komponenti. Spark Streaming modulu məlumat axını kiçik məlumat paketlərinin davamlı ardıcıllığı kimi şərh edildikdə mikro toplu arxitekturasından istifadə etməklə qurulur. Spark Streaming müxtəlif mənbələrdən məlumatları götürür və kiçik partiyalarda birləşdirir. Mütəmadi olaraq yeni paketlər yaradılır. Hər vaxt intervalının başlanğıcında yeni paket yaradılır və bu intervalda alınan hər hansı məlumat paketə daxil edilir. Aralığın sonunda paket artımı dayanır. İntervalın ölçüsü partiya intervalı adlanan parametrlə müəyyən edilir;
  • Apache Spark SQL - Əlaqəli emal ilə Spark funksional proqramlaşdırmanı birləşdirir. Strukturlaşdırılmış verilənlər dedikdə sxemi olan verilənlərə, yəni bütün qeydlər üçün vahid sahələr dəstinə aiddir. Spark SQL müxtəlif strukturlaşdırılmış məlumat mənbələrindən daxiletmələri dəstəkləyir və sxem məlumatlarının mövcudluğuna görə o, yalnız tələb olunan qeyd sahələrini səmərəli şəkildə əldə edə bilir, həmçinin DataFrame API-lərini təmin edir;
  • AWS RDS nisbətən ucuz bulud əsaslı relational verilənlər bazası, birbaşa Amazon tərəfindən idarə olunan quraşdırma, əməliyyat və miqyaslaşdırmanı asanlaşdıran veb xidmətidir.

Kafka serverinin quraşdırılması və işə salınması

Kafkadan birbaşa istifadə etməzdən əvvəl Java-nın olduğundan əmin olmalısınız, çünki JVM iş üçün istifadə olunur:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Gəlin Kafka ilə işləmək üçün yeni istifadəçi yaradaq:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Sonra, paylama dəstini rəsmi Apache Kafka saytından endirin:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Yüklənmiş arxivi çıxarın:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Növbəti addım isteğe bağlıdır. Fakt budur ki, standart parametrlər Apache Kafka-nın bütün xüsusiyyətlərindən tam istifadə etməyə imkan vermir. Məsələn, mesajların dərc oluna biləcəyi mövzunu, kateqoriyanı, qrupu silin. Bunu dəyişmək üçün konfiqurasiya faylını redaktə edək:

vim ~/kafka/config/server.properties

Faylın sonuna aşağıdakıları əlavə edin:

delete.topic.enable = true

Kafka serverinə başlamazdan əvvəl ZooKeeper serverini işə salmalısınız, biz Kafka paylanması ilə gələn köməkçi skriptdən istifadə edəcəyik:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

ZooKeeper uğurla işə salındıqdan sonra Kafka serverini ayrıca terminalda işə salırıq:

bin/kafka-server-start.sh config/server.properties

Tranzaksiya adlı yeni mövzu yaradaq:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Tələb olunan sayda bölmə və təkrarlama ilə mövzunun yaradıldığına əmin olaq:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka və Spark Streaming ilə Data Streaming

Yeni yaradılan mövzu üçün istehsalçı və istehlakçının sınaqdan keçirildiyi anları əldən verək. Mesajların göndərilməsini və qəbulunu necə sınaqdan keçirə biləcəyiniz haqqında ətraflı məlumat rəsmi sənədlərdə yazılmışdır - Bəzi mesajlar göndərin. Yaxşı, biz KafkaProducer API istifadə edərək Python-da prodüser yazmağa davam edirik.

İstehsalçı yazısı

İstehsalçı təsadüfi məlumat yaradacaq - hər saniyədə 100 mesaj. Təsadüfi məlumatlar dedikdə üç sahədən ibarət lüğət nəzərdə tutulur:

  • Filial — kredit təşkilatının satış məntəqəsinin adı;
  • Valyuta — əməliyyat valyutası;
  • Məbləği - əməliyyat məbləği. Məbləğ Bank tərəfindən valyuta alışı olduqda müsbət, satış olduqda isə mənfi olacaq.

İstehsalçı kodu belə görünür:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Sonra, göndərmə metodundan istifadə edərək, serverə, lazım olan mövzuya JSON formatında bir mesaj göndəririk:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Skripti işləyərkən terminalda aşağıdakı mesajları alırıq:

Apache Kafka və Spark Streaming ilə Data Streaming

Bu o deməkdir ki, hər şey istədiyimiz kimi işləyir - prodüser bizə lazım olan mövzuya mesajlar yaradır və göndərir.
Növbəti addım Spark-ı quraşdırmaq və bu mesaj axınını emal etməkdir.

Apache Spark quraşdırılması

Apache Spark çox yönlü və yüksək performanslı klaster hesablama platformasıdır.

Spark, interaktiv sorğular və axın daxil olmaqla, daha geniş spektrli hesablama növləri üçün dəstək təmin etməklə yanaşı, performans baxımından MapReduce modelinin populyar tətbiqlərini üstələyir. Sürət böyük miqdarda məlumatların işlənməsi zamanı mühüm rol oynayır, çünki dəqiqələr və saatlar gözləmədən interaktiv işləməyə imkan verən sürətdir. Spark-ın bu sürəti çatdırmaq üçün ən böyük üstünlüklərindən biri yaddaşdaxili hesablamaları yerinə yetirmək qabiliyyətidir.

Bu çərçivə Scala ilə yazılmışdır, ona görə də əvvəlcə onu quraşdırmalısınız:

sudo apt-get install scala

Spark paylamasını rəsmi internet saytından yükləyin:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Arxivi açın:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Spark yolunu bash faylına əlavə edin:

vim ~/.bashrc

Redaktor vasitəsilə aşağıdakı sətirləri əlavə edin:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

bashrc-ə dəyişiklik etdikdən sonra aşağıdakı əmri yerinə yetirin:

source ~/.bashrc

AWS PostgreSQL-in yerləşdirilməsi

Verilənlər bazasını genişləndirmək qalır, burada axınlardan işlənmiş məlumatları dolduracağıq. Bunun üçün biz AWS RDS xidmətindən istifadə edəcəyik.

AWS konsoluna keçin -> AWS RDS -> Verilənlər bazaları -> Verilənlər bazası yaradın:
Apache Kafka və Spark Streaming ilə Data Streaming

PostgreSQL seçin və Next düyməsini basın:
Apache Kafka və Spark Streaming ilə Data Streaming

Çünki bu nümunə yalnız təhsil məqsədləri üçün təhlil edilir, biz "minimum" (Pulsuz Tier) pulsuz serverdən istifadə edəcəyik:
Apache Kafka və Spark Streaming ilə Data Streaming

Sonra, Pulsuz Tier blokunu yoxlayın və bundan sonra bizə avtomatik olaraq t2.micro sinifinin nümunəsi təklif ediləcək - zəif olsa da, pulsuz və tapşırığımız üçün olduqca uyğundur:
Apache Kafka və Spark Streaming ilə Data Streaming

Çox vacib şeylər izlənilir: DB instansiyasının adı, əsas istifadəçinin adı və onun parolu. Məsələni çağıraq: myHabrTest, master user: habr, parol: habr12345 və Next düyməsini basın:
Apache Kafka və Spark Streaming ilə Data Streaming

Növbəti səhifədə verilənlər bazası serverimizin xaricdən əlçatanlığı (İctimai əlçatanlıq) və portların mövcudluğu üçün cavabdeh olan parametrlər var:

Apache Kafka və Spark Streaming ilə Data Streaming

Gəlin VPC təhlükəsizlik qrupu üçün 5432 (PostgreSQL) portu vasitəsilə verilənlər bazası serverimizə xaricə daxil olmağa imkan verəcək yeni parametr yaradaq.
Gəlin ayrı brauzer pəncərəsində VPC İdarə Panelində AWS konsoluna gedək -> Təhlükəsizlik Qrupları -> Təhlükəsizlik qrupu yarat bölməsi:
Apache Kafka və Spark Streaming ilə Data Streaming

Təhlükəsizlik qrupunun adını təyin etdik - PostgreSQL, təsvir, bu qrupun hansı VPC ilə əlaqələndirilməsini təyin edin və Yarat düyməsini basın:
Apache Kafka və Spark Streaming ilə Data Streaming

Aşağıdakı şəkildə göstərildiyi kimi, 5432 portu üçün yeni yaradılmış qrup Gələn qaydaları doldururuq. Siz portu əl ilə təyin edə bilməzsiniz, lakin Tip açılan siyahısından PostgreSQL seçin.

Düzünü desək, ::/0 dəyəri serverə dünyanın hər yerindən daxil olan trafikin mövcudluğu deməkdir, bu, kanonik olaraq tamamilə doğru deyil, lakin nümunəni təhlil etmək üçün bu yanaşmadan istifadə edək:
Apache Kafka və Spark Streaming ilə Data Streaming

Brauzer səhifəsinə qayıdırıq, burada "Qabaqcıl parametrləri konfiqurasiya et" bölməsini açırıq və bölmədə VPC təhlükəsizlik qruplarını seçin -> Mövcud VPC təhlükəsizlik qruplarını seçin -> PostgreSQL:
Apache Kafka və Spark Streaming ilə Data Streaming

Sonra bölmədə Verilənlər bazası seçimləri -> Verilənlər bazasının adı -> adı təyin edin - habrDB.

Defolt olaraq ehtiyat nüsxəni (ehtiyatın saxlanma müddəti - 0 gün), monitorinq və Performans İnsightlarını söndürmək istisna olmaqla, qalan parametrləri tərk edə bilərik. Düyməni basın Verilənlər bazası yaradın:
Apache Kafka və Spark Streaming ilə Data Streaming

Yayım İşləyicisi

Son mərhələ Kafkadan hər iki saniyədən bir yeni məlumatları emal edəcək və nəticəni verilənlər bazasına daxil edəcək Spark işinin hazırlanması olacaq.

Yuxarıda qeyd edildiyi kimi, yoxlama məntəqələri SparkStreaming-də səhvlərə dözümlülüyü təmin etmək üçün konfiqurasiya edilməli olan əsas mexanizmdir. Biz yoxlama nöqtələrindən istifadə edəcəyik və prosedur uğursuz olarsa, Spark Streaming modulu itirilmiş məlumatları bərpa etmək üçün yalnız sonuncu yoxlama məntəqəsinə qayıtmalı və ondan hesablamaları davam etdirməlidir.

Yoxlama məntəqəsi məlumatın saxlanacağı xətaya dözümlü, etibarlı fayl sistemində (məsələn, HDFS, S3 və s.) kataloq qurmaqla aktivləşdirilə bilər. Bu, məsələn:

streamingContext.checkpoint(checkpointDirectory)

Nümunəmizdə aşağıdakı yanaşmadan istifadə edəcəyik, yəni checkpointDirectory varsa, kontekst yoxlama nöqtəsi məlumatlarından yenidən yaradılacaq. Əgər kataloq mövcud deyilsə (yəni ilk dəfə icra olunur), onda yeni kontekst yaratmaq və DStreams qurmaq üçün functionToCreateContext funksiyası çağırılır:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

KafkaUtils kitabxanasının createDirectStream metodundan istifadə edərək "əməliyyat" mövzusuna qoşulmaq üçün DirectStream obyekti yaradırıq:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

JSON formatında daxil olan məlumatların təhlili:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Spark SQL istifadə edərək, biz sadə qruplaşdırma aparırıq və nəticəni konsola çıxarırıq:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Sorğunun gövdəsini əldə etmək və onu Spark SQL vasitəsilə idarə etmək:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Və sonra biz alınan məcmu məlumatları AWS RDS-də cədvəldə saxlayırıq. Ümumiləşdirmənin nəticələrini verilənlər bazası cədvəlində saxlamaq üçün DataFrame obyektinin yazma metodundan istifadə edəcəyik:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

AWS RDS ilə əlaqə qurmaq haqqında bir neçə söz. Bunun üçün istifadəçi və parolu “AWS PostgreSQL-in yerləşdirilməsi” addımında yaratdıq. Verilənlər bazası serverinin url-i olaraq, Bağlantı və təhlükəsizlik bölməsində göstərilən Son nöqtədən istifadə etməlisiniz:

Apache Kafka və Spark Streaming ilə Data Streaming

Spark və Kafkanı düzgün birləşdirmək üçün işi artefaktdan istifadə edərək smark-submit vasitəsilə yerinə yetirməlisiniz. spark-streaming-kafka-0-8_2.11. Bundan əlavə, biz PostgreSQL verilənlər bazası ilə qarşılıqlı əlaqə üçün artefaktdan da istifadə edəcəyik, onları --paketlər vasitəsilə keçirəcəyik.

Skriptin çevikliyi üçün, həmçinin giriş parametrləri kimi mesaj serverinin adını və məlumat almaq istədiyimiz mövzunu çıxaracağıq.

Beləliklə, sistemi işə salmağın və sınaqdan keçirməyin vaxtı gəldi:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Hər şey alındı! Aşağıdakı şəkildə gördüyünüz kimi, proqram işləyərkən hər 2 saniyədən bir yeni toplama nəticələri göstərilir, çünki biz StreamingContext obyektini yaradanda paketləmə intervalını 2 saniyəyə təyin etmişik:

Apache Kafka və Spark Streaming ilə Data Streaming

Sonra, cədvəldəki qeydləri yoxlamaq üçün verilənlər bazasına sadə bir sorğu edirik əməliyyat_axı:

Apache Kafka və Spark Streaming ilə Data Streaming

Nəticə

Bu məqalədə Apache Kafka və PostgreSQL ilə birlikdə Spark Streaming-dən istifadə edərək axın məlumatlarının işlənməsi nümunəsi nəzərdən keçirilmişdir. Müxtəlif mənbələrdən alınan məlumatların həcminin artması ilə real vaxt rejimində və axın proqramları yaratmaq üçün Spark Streaming-in praktik dəyərini qiymətləndirmək çətindir.

Tam mənbə kodunu mənim depomda tapa bilərsiniz Github.

Bu məqaləni müzakirə etməkdən məmnunam, şərhlərinizi səbirsizliklə gözləyirəm, həmçinin bütün maraqlanan oxuculardan konstruktiv tənqidə ümid edirəm.

Uğurlar diləyirəm!

Məz. Əvvəlcə yerli PostgreSQL verilənlər bazasından istifadə etmək planlaşdırılırdı, lakin AWS-ə olan sevgimi nəzərə alaraq verilənlər bazasını buludlara köçürmək qərarına gəldim. Bu mövzu ilə bağlı növbəti məqalədə AWS Kinesis və AWS EMR-dən istifadə edərək yuxarıda təsvir edilən bütün sistemi AWS-də necə tətbiq edəcəyinizi sizə göstərəcəyəm. Xəbərləri izləyin!

Mənbə: www.habr.com

Добавить комментарий