Apache Kafka ve Spark Streaming ile Akış Veri İşleme

Merhaba Habr! Bugün Spark Streaming'i kullanarak Apache Kafka mesaj akışlarını işleyecek ve işleme sonuçlarını AWS RDS bulut veritabanına yazacak bir sistem oluşturacağız.

Belirli bir kredi kurumunun bize tüm şubelerinde gelen işlemleri "anında" işleme görevini verdiğini düşünelim. Bu, hazine için açık bir döviz pozisyonunun, limitlerin veya işlemlere ilişkin finansal sonuçların vb. anında hesaplanması amacıyla yapılabilir.

Bu vakayı sihir ve büyü kullanmadan nasıl uygulayabilirsiniz - kesimin altını okuyun! Gitmek!

Apache Kafka ve Spark Streaming ile Akış Veri İşleme
(Resim kaynağı)

Giriş

Elbette büyük miktarda verinin gerçek zamanlı olarak işlenmesi, modern sistemlerde kullanım için geniş fırsatlar sağlar. Bunun için en popüler kombinasyonlardan biri, Kafka'nın gelen mesaj paketlerinden oluşan bir akış oluşturduğu ve Spark Streaming'in bu paketleri belirli bir zaman aralığında işlediği Apache Kafka ve Spark Streaming ikilisidir.

Uygulamanın hata toleransını arttırmak için kontrol noktalarını kullanacağız. Bu mekanizma sayesinde Spark Streaming motorunun kayıp verileri kurtarması gerektiğinde yalnızca son kontrol noktasına geri dönmesi ve hesaplamalara oradan devam etmesi yeterlidir.

Geliştirilen sistemin mimarisi

Apache Kafka ve Spark Streaming ile Akış Veri İşleme

Kullanılan bileşenler:

  • Apache Kafka dağıtılmış bir yayınlama-abone olma mesajlaşma sistemidir. Hem çevrimdışı hem de çevrimiçi mesaj tüketimi için uygundur. Veri kaybını önlemek için Kafka mesajları diskte saklanır ve küme içinde çoğaltılır. Kafka sistemi ZooKeeper senkronizasyon hizmetinin üzerine kurulmuştur;
  • Apache Spark Akışı - Akış verilerinin işlenmesi için Spark bileşeni. Spark Streaming modülü, veri akışının sürekli bir küçük veri paketleri dizisi olarak yorumlandığı bir mikro toplu mimari kullanılarak oluşturulmuştur. Spark Streaming, verileri farklı kaynaklardan alır ve bunları küçük paketler halinde birleştirir. Düzenli aralıklarla yeni paketler oluşturulur. Her zaman aralığının başında yeni bir paket oluşturulur ve bu aralıkta alınan veriler pakete dahil edilir. Aralığın sonunda paket büyümesi durur. Aralığın boyutu toplu iş aralığı adı verilen bir parametreyle belirlenir;
  • Apache Kıvılcım SQL'i - ilişkisel işlemeyi Spark fonksiyonel programlamayla birleştirir. Yapılandırılmış veriler, bir şemaya, yani tüm kayıtlar için tek bir alan kümesine sahip veriler anlamına gelir. Spark SQL, çeşitli yapılandırılmış veri kaynaklarından gelen girdileri destekler ve şema bilgilerinin kullanılabilirliği sayesinde yalnızca gerekli kayıt alanlarını verimli bir şekilde alabilir ve ayrıca DataFrame API'leri sağlar;
  • AWS RDS'si nispeten ucuz, bulut tabanlı bir ilişkisel veritabanıdır; kurulumu, işletimi ve ölçeklendirmeyi basitleştiren ve doğrudan Amazon tarafından yönetilen bir web hizmetidir.

Kafka sunucusunun kurulumu ve çalıştırılması

Kafka'yı doğrudan kullanmadan önce Java'ya sahip olduğunuzdan emin olmanız gerekir, çünkü... JVM iş için kullanılır:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Kafka ile çalışacak yeni bir kullanıcı oluşturalım:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Ardından dağıtımı resmi Apache Kafka web sitesinden indirin:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

İndirilen arşivi paketinden çıkarın:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Bir sonraki adım isteğe bağlıdır. Gerçek şu ki, varsayılan ayarlar Apache Kafka'nın tüm özelliklerini tam olarak kullanmanıza izin vermiyor. Örneğin mesajların yayınlanabileceği bir konuyu, kategoriyi, grubu silin. Bunu değiştirmek için konfigürasyon dosyasını düzenleyelim:

vim ~/kafka/config/server.properties

Dosyanın sonuna şunu ekleyin:

delete.topic.enable = true

Kafka sunucusunu başlatmadan önce ZooKeeper sunucusunu başlatmanız gerekiyor; Kafka dağıtımıyla birlikte gelen yardımcı betiği kullanacağız:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

ZooKeeper başarıyla başladıktan sonra Kafka sunucusunu ayrı bir terminalde başlatın:

bin/kafka-server-start.sh config/server.properties

İşlem adında yeni bir konu oluşturalım:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Gerekli sayıda bölüm ve çoğaltmaya sahip bir konunun oluşturulduğundan emin olalım:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka ve Spark Streaming ile Akış Veri İşleme

Yeni oluşturulan konu için üretici ve tüketiciyi test etme anlarını kaçıralım. Mesaj gönderip almayı nasıl test edebileceğiniz hakkında daha fazla ayrıntı resmi belgelerde yazılmıştır - Birkaç mesaj gönder. KafkaProducer API'sini kullanarak Python'da bir yapımcı yazmaya geçiyoruz.

Yapımcı yazısı

Üretici rastgele veri üretecek - saniyede 100 mesaj. Rastgele verilerle, üç alandan oluşan bir sözlüğü kastediyoruz:

  • şube - kredi kuruluşunun satış noktasının adı;
  • Para birimi — işlem para birimi;
  • Ücret - işlem tutarı. Tutar, Banka tarafından döviz alımı ise pozitif, satış ise negatif sayı olacaktır.

Üreticinin kodu şöyle görünür:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Daha sonra send yöntemini kullanarak sunucuya, ihtiyacımız olan konuya JSON formatında bir mesaj gönderiyoruz:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Betiği çalıştırırken terminalde aşağıdaki mesajları alıyoruz:

Apache Kafka ve Spark Streaming ile Akış Veri İşleme

Bu, her şeyin istediğimiz gibi çalıştığı anlamına gelir - yapımcı ihtiyacımız olan konuya mesajlar oluşturur ve gönderir.
Bir sonraki adım Spark'ı kurmak ve bu mesaj akışını işlemektir.

Apache Spark'ın Kurulumu

Apache Spark evrensel ve yüksek performanslı bir küme bilişim platformudur.

Spark, MapReduce modelinin popüler uygulamalarından daha iyi performans gösterirken etkileşimli sorgular ve akış işleme de dahil olmak üzere daha geniş bir yelpazedeki hesaplama türlerini destekler. Dakikalarca veya saatlerce beklemeden etkileşimli olarak çalışmanıza olanak tanıyan hız olduğundan, büyük miktarda veri işlenirken hız önemli bir rol oynar. Spark'ı bu kadar hızlı yapan en büyük güçlü yönlerden biri, bellek içi hesaplamalar yapabilme yeteneğidir.

Bu çerçeve Scala'da yazılmıştır, bu nedenle önce onu yüklemeniz gerekir:

sudo apt-get install scala

Spark dağıtımını resmi web sitesinden indirin:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Arşivi paketinden çıkarın:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Spark yolunu bash dosyasına ekleyin:

vim ~/.bashrc

Editör aracılığıyla aşağıdaki satırları ekleyin:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Bashrc'de değişiklik yaptıktan sonra aşağıdaki komutu çalıştırın:

source ~/.bashrc

AWS PostgreSQL'i dağıtma

Geriye kalan tek şey, akışlardan işlenmiş bilgileri yükleyeceğimiz veritabanını dağıtmaktır. Bunun için AWS RDS hizmetini kullanacağız.

AWS konsoluna gidin -> AWS RDS -> Veritabanları -> Veritabanı oluştur:
Apache Kafka ve Spark Streaming ile Akış Veri İşleme

PostgreSQL'i seçin ve İleri'ye tıklayın:
Apache Kafka ve Spark Streaming ile Akış Veri İşleme

Çünkü Bu örnek yalnızca eğitim amaçlıdır; "minimum" (Ücretsiz Katman) ücretsiz bir sunucu kullanacağız:
Apache Kafka ve Spark Streaming ile Akış Veri İşleme

Ardından, Ücretsiz Katman bloğuna bir onay işareti koyuyoruz ve bundan sonra otomatik olarak t2.micro sınıfının bir örneği bize sunulacak - zayıf olmasına rağmen ücretsiz ve görevimiz için oldukça uygun:
Apache Kafka ve Spark Streaming ile Akış Veri İşleme

Daha sonra çok önemli şeyler gelir: veritabanı örneğinin adı, ana kullanıcının adı ve parolası. Örneğe myHabrTest, ana kullanıcı adını verelim: haber, şifre: habr12345 ve İleri düğmesine tıklayın:
Apache Kafka ve Spark Streaming ile Akış Veri İşleme

Bir sonraki sayfada veritabanı sunucumuzun dışarıdan erişilebilirliğinden (Genel erişilebilirlik) ve port kullanılabilirliğinden sorumlu parametreler bulunmaktadır:

Apache Kafka ve Spark Streaming ile Akış Veri İşleme

VPC güvenlik grubu için veritabanı sunucumuza 5432 numaralı bağlantı noktası (PostgreSQL) üzerinden harici erişime izin verecek yeni bir ayar oluşturalım.
AWS konsolunda ayrı bir tarayıcı penceresinde VPC Kontrol Paneli -> Güvenlik Grupları -> Güvenlik grubu oluştur bölümüne gidelim:
Apache Kafka ve Spark Streaming ile Akış Veri İşleme

Güvenlik grubunun adını belirledik - PostgreSQL, bir açıklama, bu grubun hangi VPC ile ilişkilendirilmesi gerektiğini belirtin ve Oluştur düğmesine tıklayın:
Apache Kafka ve Spark Streaming ile Akış Veri İşleme

Yeni oluşturulan grup için 5432 numaralı bağlantı noktası için Inbound kurallarını aşağıdaki resimde gösterildiği gibi doldurun. Bağlantı noktasını manuel olarak belirtemezsiniz ancak Tür açılır listesinden PostgreSQL'i seçin.

Açıkça konuşursak, ::/0 değeri dünyanın her yerinden sunucuya gelen trafiğin kullanılabilirliği anlamına gelir; bu kural olarak tamamen doğru değildir, ancak örneği analiz etmek için kendimize şu yaklaşımı kullanma izni verelim:
Apache Kafka ve Spark Streaming ile Akış Veri İşleme

“Gelişmiş ayarları yapılandır” seçeneğinin açık olduğu tarayıcı sayfasına dönüyoruz ve VPC güvenlik grupları bölümünde -> Mevcut VPC güvenlik gruplarını seç -> PostgreSQL'i seçiyoruz:
Apache Kafka ve Spark Streaming ile Akış Veri İşleme

Daha sonra, Veritabanı seçenekleri -> Veritabanı adı -> adı ayarlayın - habrDB.

Yedeklemeyi devre dışı bırakma (yedek tutma süresi - 0 gün), izleme ve Performans Analizleri dışında kalan parametreleri varsayılan olarak bırakabiliriz. Düğmeye tıklayın Veritabanı oluştur:
Apache Kafka ve Spark Streaming ile Akış Veri İşleme

Konu işleyici

Son aşama ise her iki saniyede bir Kafka'dan gelen yeni verileri işleyecek ve sonucu veritabanına girecek bir Spark işinin geliştirilmesi olacak.

Yukarıda belirtildiği gibi kontrol noktaları, SparkStreaming'de hata toleransını sağlayacak şekilde yapılandırılması gereken temel bir mekanizmadır. Kontrol noktalarını kullanacağız ve prosedür başarısız olursa Spark Streaming modülünün yalnızca son kontrol noktasına dönmesi ve kayıp verileri kurtarmak için hesaplamaları buradan sürdürmesi gerekecektir.

Denetim noktası bilgilerinin depolanacağı hataya dayanıklı, güvenilir bir dosya sistemi (HDFS, S3 vb. gibi) üzerinde bir dizin ayarlanarak denetim noktası oluşturma etkinleştirilebilir. Bu, örneğin aşağıdakiler kullanılarak yapılır:

streamingContext.checkpoint(checkpointDirectory)

Örneğimizde aşağıdaki yaklaşımı kullanacağız, yani eğer checkpointDirectory mevcutsa, bağlam kontrol noktası verilerinden yeniden oluşturulacaktır. Dizin mevcut değilse (yani ilk kez çalıştırıldıysa), yeni bir bağlam oluşturmak ve DStreams'i yapılandırmak için functionToCreateContext çağrılır:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

KafkaUtils kütüphanesinin createDirectStream metodunu kullanarak “işlem” konusuna bağlanmak için bir DirectStream nesnesi oluşturuyoruz:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Gelen veriler JSON formatında ayrıştırılıyor:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Spark SQL'i kullanarak basit bir gruplama yapıyoruz ve sonucu konsolda gösteriyoruz:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Sorgu metnini alma ve Spark SQL üzerinden çalıştırma:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Daha sonra elde edilen toplu verileri AWS RDS'deki bir tabloya kaydederiz. Toplama sonuçlarını bir veritabanı tablosuna kaydetmek için DataFrame nesnesinin write yöntemini kullanacağız:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

AWS RDS ile bağlantı kurma hakkında birkaç söz. Bunun için kullanıcı ve şifreyi “AWS PostgreSQL Dağıtımı” adımında oluşturduk. Bağlantı ve güvenlik bölümünde görüntülenen veritabanı sunucusu URL'si olarak Endpoint'i kullanmalısınız:

Apache Kafka ve Spark Streaming ile Akış Veri İşleme

Spark ve Kafka'yı doğru bir şekilde bağlamak için işi, yapıyı kullanarak smark-submit yoluyla çalıştırmalısınız. spark-streaming-kafka-0-8_2.11. Ayrıca PostgreSQL veritabanıyla etkileşim için bir yapı kullanacağız; bunları --packages aracılığıyla aktaracağız.

Komut dosyasının esnekliği için, giriş parametreleri olarak mesaj sunucusunun adını ve veri almak istediğimiz konuyu da dahil edeceğiz.

Artık sistemin işlevselliğini başlatıp kontrol etme zamanı geldi:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Her şey yolunda gitti! Aşağıdaki resimde görebileceğiniz gibi uygulama çalışırken, StreamingContext nesnesini oluşturduğumuzda toplu iş aralığını 2 saniyeye ayarladığımız için her 2 saniyede bir yeni toplama sonuçları çıkıyor:

Apache Kafka ve Spark Streaming ile Akış Veri İşleme

Daha sonra tablodaki kayıtların varlığını kontrol etmek için veritabanına basit bir sorgulama yapıyoruz. işlem_akışı:

Apache Kafka ve Spark Streaming ile Akış Veri İşleme

Sonuç

Bu makalede, Apache Kafka ve PostgreSQL ile birlikte Spark Streaming kullanılarak bilgilerin akış halinde işlenmesine ilişkin bir örnek incelendi. Çeşitli kaynaklardan gelen verilerin artmasıyla birlikte, Spark Streaming'in akış ve gerçek zamanlı uygulamalar oluşturmaya yönelik pratik değerini abartmak zordur.

Kaynak kodunun tamamını şu adresteki depomda bulabilirsiniz: GitHub.

Bu makaleyi tartışmaktan mutluluk duyuyorum, yorumlarınızı sabırsızlıkla bekliyorum ve ayrıca duyarlı tüm okuyuculardan yapıcı eleştiriler bekliyorum.

Sana başarılar diliyorum!

Ps. Başlangıçta yerel PostgreSQL veritabanının kullanılması planlanmıştı ancak AWS'ye olan sevgim nedeniyle veritabanını buluta taşımaya karar verdim. Bu konuyla ilgili bir sonraki makalede, yukarıda anlatılan tüm sistemin AWS Kinesis ve AWS EMR kullanılarak AWS'de nasıl uygulanacağını göstereceğim. Haberleri takip edin!

Kaynak: habr.com

Yorum ekle