ProHoster > Blog > yönetim > Apache Kafka ve Spark Streaming ile Akış Veri İşleme
Apache Kafka ve Spark Streaming ile Akış Veri İşleme
Merhaba Habr! Bugün Spark Streaming'i kullanarak Apache Kafka mesaj akışlarını işleyecek ve işleme sonuçlarını AWS RDS bulut veritabanına yazacak bir sistem oluşturacağız.
Belirli bir kredi kurumunun bize tüm şubelerinde gelen işlemleri "anında" işleme görevini verdiğini düşünelim. Bu, hazine için açık bir döviz pozisyonunun, limitlerin veya işlemlere ilişkin finansal sonuçların vb. anında hesaplanması amacıyla yapılabilir.
Bu vakayı sihir ve büyü kullanmadan nasıl uygulayabilirsiniz - kesimin altını okuyun! Gitmek!
Elbette büyük miktarda verinin gerçek zamanlı olarak işlenmesi, modern sistemlerde kullanım için geniş fırsatlar sağlar. Bunun için en popüler kombinasyonlardan biri, Kafka'nın gelen mesaj paketlerinden oluşan bir akış oluşturduğu ve Spark Streaming'in bu paketleri belirli bir zaman aralığında işlediği Apache Kafka ve Spark Streaming ikilisidir.
Uygulamanın hata toleransını arttırmak için kontrol noktalarını kullanacağız. Bu mekanizma sayesinde Spark Streaming motorunun kayıp verileri kurtarması gerektiğinde yalnızca son kontrol noktasına geri dönmesi ve hesaplamalara oradan devam etmesi yeterlidir.
Geliştirilen sistemin mimarisi
Kullanılan bileşenler:
Apache Kafka dağıtılmış bir yayınlama-abone olma mesajlaşma sistemidir. Hem çevrimdışı hem de çevrimiçi mesaj tüketimi için uygundur. Veri kaybını önlemek için Kafka mesajları diskte saklanır ve küme içinde çoğaltılır. Kafka sistemi ZooKeeper senkronizasyon hizmetinin üzerine kurulmuştur;
Apache Spark Akışı - Akış verilerinin işlenmesi için Spark bileşeni. Spark Streaming modülü, veri akışının sürekli bir küçük veri paketleri dizisi olarak yorumlandığı bir mikro toplu mimari kullanılarak oluşturulmuştur. Spark Streaming, verileri farklı kaynaklardan alır ve bunları küçük paketler halinde birleştirir. Düzenli aralıklarla yeni paketler oluşturulur. Her zaman aralığının başında yeni bir paket oluşturulur ve bu aralıkta alınan veriler pakete dahil edilir. Aralığın sonunda paket büyümesi durur. Aralığın boyutu toplu iş aralığı adı verilen bir parametreyle belirlenir;
Apache Kıvılcım SQL'i - ilişkisel işlemeyi Spark fonksiyonel programlamayla birleştirir. Yapılandırılmış veriler, bir şemaya, yani tüm kayıtlar için tek bir alan kümesine sahip veriler anlamına gelir. Spark SQL, çeşitli yapılandırılmış veri kaynaklarından gelen girdileri destekler ve şema bilgilerinin kullanılabilirliği sayesinde yalnızca gerekli kayıt alanlarını verimli bir şekilde alabilir ve ayrıca DataFrame API'leri sağlar;
AWS RDS'si nispeten ucuz, bulut tabanlı bir ilişkisel veritabanıdır; kurulumu, işletimi ve ölçeklendirmeyi basitleştiren ve doğrudan Amazon tarafından yönetilen bir web hizmetidir.
Kafka sunucusunun kurulumu ve çalıştırılması
Kafka'yı doğrudan kullanmadan önce Java'ya sahip olduğunuzdan emin olmanız gerekir, çünkü... JVM iş için kullanılır:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Bir sonraki adım isteğe bağlıdır. Gerçek şu ki, varsayılan ayarlar Apache Kafka'nın tüm özelliklerini tam olarak kullanmanıza izin vermiyor. Örneğin mesajların yayınlanabileceği bir konuyu, kategoriyi, grubu silin. Bunu değiştirmek için konfigürasyon dosyasını düzenleyelim:
vim ~/kafka/config/server.properties
Dosyanın sonuna şunu ekleyin:
delete.topic.enable = true
Kafka sunucusunu başlatmadan önce ZooKeeper sunucusunu başlatmanız gerekiyor; Kafka dağıtımıyla birlikte gelen yardımcı betiği kullanacağız:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
ZooKeeper başarıyla başladıktan sonra Kafka sunucusunu ayrı bir terminalde başlatın:
Yeni oluşturulan konu için üretici ve tüketiciyi test etme anlarını kaçıralım. Mesaj gönderip almayı nasıl test edebileceğiniz hakkında daha fazla ayrıntı resmi belgelerde yazılmıştır - Birkaç mesaj gönder. KafkaProducer API'sini kullanarak Python'da bir yapımcı yazmaya geçiyoruz.
Yapımcı yazısı
Üretici rastgele veri üretecek - saniyede 100 mesaj. Rastgele verilerle, üç alandan oluşan bir sözlüğü kastediyoruz:
şube - kredi kuruluşunun satış noktasının adı;
Para birimi — işlem para birimi;
Ücret - işlem tutarı. Tutar, Banka tarafından döviz alımı ise pozitif, satış ise negatif sayı olacaktır.
Daha sonra send yöntemini kullanarak sunucuya, ihtiyacımız olan konuya JSON formatında bir mesaj gönderiyoruz:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Betiği çalıştırırken terminalde aşağıdaki mesajları alıyoruz:
Bu, her şeyin istediğimiz gibi çalıştığı anlamına gelir - yapımcı ihtiyacımız olan konuya mesajlar oluşturur ve gönderir.
Bir sonraki adım Spark'ı kurmak ve bu mesaj akışını işlemektir.
Apache Spark'ın Kurulumu
Apache Spark evrensel ve yüksek performanslı bir küme bilişim platformudur.
Spark, MapReduce modelinin popüler uygulamalarından daha iyi performans gösterirken etkileşimli sorgular ve akış işleme de dahil olmak üzere daha geniş bir yelpazedeki hesaplama türlerini destekler. Dakikalarca veya saatlerce beklemeden etkileşimli olarak çalışmanıza olanak tanıyan hız olduğundan, büyük miktarda veri işlenirken hız önemli bir rol oynar. Spark'ı bu kadar hızlı yapan en büyük güçlü yönlerden biri, bellek içi hesaplamalar yapabilme yeteneğidir.
Bu çerçeve Scala'da yazılmıştır, bu nedenle önce onu yüklemeniz gerekir:
Çünkü Bu örnek yalnızca eğitim amaçlıdır; "minimum" (Ücretsiz Katman) ücretsiz bir sunucu kullanacağız:
Ardından, Ücretsiz Katman bloğuna bir onay işareti koyuyoruz ve bundan sonra otomatik olarak t2.micro sınıfının bir örneği bize sunulacak - zayıf olmasına rağmen ücretsiz ve görevimiz için oldukça uygun:
Daha sonra çok önemli şeyler gelir: veritabanı örneğinin adı, ana kullanıcının adı ve parolası. Örneğe myHabrTest, ana kullanıcı adını verelim: haber, şifre: habr12345 ve İleri düğmesine tıklayın:
Bir sonraki sayfada veritabanı sunucumuzun dışarıdan erişilebilirliğinden (Genel erişilebilirlik) ve port kullanılabilirliğinden sorumlu parametreler bulunmaktadır:
VPC güvenlik grubu için veritabanı sunucumuza 5432 numaralı bağlantı noktası (PostgreSQL) üzerinden harici erişime izin verecek yeni bir ayar oluşturalım.
AWS konsolunda ayrı bir tarayıcı penceresinde VPC Kontrol Paneli -> Güvenlik Grupları -> Güvenlik grubu oluştur bölümüne gidelim:
Güvenlik grubunun adını belirledik - PostgreSQL, bir açıklama, bu grubun hangi VPC ile ilişkilendirilmesi gerektiğini belirtin ve Oluştur düğmesine tıklayın:
Yeni oluşturulan grup için 5432 numaralı bağlantı noktası için Inbound kurallarını aşağıdaki resimde gösterildiği gibi doldurun. Bağlantı noktasını manuel olarak belirtemezsiniz ancak Tür açılır listesinden PostgreSQL'i seçin.
Açıkça konuşursak, ::/0 değeri dünyanın her yerinden sunucuya gelen trafiğin kullanılabilirliği anlamına gelir; bu kural olarak tamamen doğru değildir, ancak örneği analiz etmek için kendimize şu yaklaşımı kullanma izni verelim:
“Gelişmiş ayarları yapılandır” seçeneğinin açık olduğu tarayıcı sayfasına dönüyoruz ve VPC güvenlik grupları bölümünde -> Mevcut VPC güvenlik gruplarını seç -> PostgreSQL'i seçiyoruz:
Daha sonra, Veritabanı seçenekleri -> Veritabanı adı -> adı ayarlayın - habrDB.
Yedeklemeyi devre dışı bırakma (yedek tutma süresi - 0 gün), izleme ve Performans Analizleri dışında kalan parametreleri varsayılan olarak bırakabiliriz. Düğmeye tıklayın Veritabanı oluştur:
Konu işleyici
Son aşama ise her iki saniyede bir Kafka'dan gelen yeni verileri işleyecek ve sonucu veritabanına girecek bir Spark işinin geliştirilmesi olacak.
Yukarıda belirtildiği gibi kontrol noktaları, SparkStreaming'de hata toleransını sağlayacak şekilde yapılandırılması gereken temel bir mekanizmadır. Kontrol noktalarını kullanacağız ve prosedür başarısız olursa Spark Streaming modülünün yalnızca son kontrol noktasına dönmesi ve kayıp verileri kurtarmak için hesaplamaları buradan sürdürmesi gerekecektir.
Denetim noktası bilgilerinin depolanacağı hataya dayanıklı, güvenilir bir dosya sistemi (HDFS, S3 vb. gibi) üzerinde bir dizin ayarlanarak denetim noktası oluşturma etkinleştirilebilir. Bu, örneğin aşağıdakiler kullanılarak yapılır:
streamingContext.checkpoint(checkpointDirectory)
Örneğimizde aşağıdaki yaklaşımı kullanacağız, yani eğer checkpointDirectory mevcutsa, bağlam kontrol noktası verilerinden yeniden oluşturulacaktır. Dizin mevcut değilse (yani ilk kez çalıştırıldıysa), yeni bir bağlam oluşturmak ve DStreams'i yapılandırmak için functionToCreateContext çağrılır:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
KafkaUtils kütüphanesinin createDirectStream metodunu kullanarak “işlem” konusuna bağlanmak için bir DirectStream nesnesi oluşturuyoruz:
Spark SQL'i kullanarak basit bir gruplama yapıyoruz ve sonucu konsolda gösteriyoruz:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Sorgu metnini alma ve Spark SQL üzerinden çalıştırma:
Daha sonra elde edilen toplu verileri AWS RDS'deki bir tabloya kaydederiz. Toplama sonuçlarını bir veritabanı tablosuna kaydetmek için DataFrame nesnesinin write yöntemini kullanacağız:
AWS RDS ile bağlantı kurma hakkında birkaç söz. Bunun için kullanıcı ve şifreyi “AWS PostgreSQL Dağıtımı” adımında oluşturduk. Bağlantı ve güvenlik bölümünde görüntülenen veritabanı sunucusu URL'si olarak Endpoint'i kullanmalısınız:
Spark ve Kafka'yı doğru bir şekilde bağlamak için işi, yapıyı kullanarak smark-submit yoluyla çalıştırmalısınız. spark-streaming-kafka-0-8_2.11. Ayrıca PostgreSQL veritabanıyla etkileşim için bir yapı kullanacağız; bunları --packages aracılığıyla aktaracağız.
Komut dosyasının esnekliği için, giriş parametreleri olarak mesaj sunucusunun adını ve veri almak istediğimiz konuyu da dahil edeceğiz.
Artık sistemin işlevselliğini başlatıp kontrol etme zamanı geldi:
Her şey yolunda gitti! Aşağıdaki resimde görebileceğiniz gibi uygulama çalışırken, StreamingContext nesnesini oluşturduğumuzda toplu iş aralığını 2 saniyeye ayarladığımız için her 2 saniyede bir yeni toplama sonuçları çıkıyor:
Daha sonra tablodaki kayıtların varlığını kontrol etmek için veritabanına basit bir sorgulama yapıyoruz. işlem_akışı:
Sonuç
Bu makalede, Apache Kafka ve PostgreSQL ile birlikte Spark Streaming kullanılarak bilgilerin akış halinde işlenmesine ilişkin bir örnek incelendi. Çeşitli kaynaklardan gelen verilerin artmasıyla birlikte, Spark Streaming'in akış ve gerçek zamanlı uygulamalar oluşturmaya yönelik pratik değerini abartmak zordur.
Kaynak kodunun tamamını şu adresteki depomda bulabilirsiniz: GitHub.
Bu makaleyi tartışmaktan mutluluk duyuyorum, yorumlarınızı sabırsızlıkla bekliyorum ve ayrıca duyarlı tüm okuyuculardan yapıcı eleştiriler bekliyorum.
Sana başarılar diliyorum!
Ps. Başlangıçta yerel PostgreSQL veritabanının kullanılması planlanmıştı ancak AWS'ye olan sevgim nedeniyle veritabanını buluta taşımaya karar verdim. Bu konuyla ilgili bir sonraki makalede, yukarıda anlatılan tüm sistemin AWS Kinesis ve AWS EMR kullanılarak AWS'de nasıl uygulanacağını göstereceğim. Haberleri takip edin!