“Kafka Akımları İş Başında” kitabı. Gerçek zamanlı çalışmaya yönelik uygulamalar ve mikro hizmetler"

“Kafka Akımları İş Başında” kitabı. Gerçek zamanlı çalışmaya yönelik uygulamalar ve mikro hizmetler" Merhaba Khabro sakinleri! Bu kitap, iş parçacığı işlemeyi anlamak isteyen her geliştirici için uygundur. Dağıtılmış programlamayı anlamak, Kafka ve Kafka Akışlarını daha iyi anlamanıza yardımcı olacaktır. Kafka çerçevesinin kendisini bilmek güzel olurdu ama bu gerekli değil: İhtiyacınız olan her şeyi size anlatacağım. Deneyimli Kafka geliştiricileri ve acemiler, bu kitapta Kafka Streams kütüphanesini kullanarak ilginç akış işleme uygulamalarının nasıl oluşturulacağını öğrenecekler. Serileştirme gibi kavramlara zaten aşina olan orta ve ileri düzey Java geliştiricileri, Kafka Streams uygulamaları oluşturmak için becerilerini uygulamayı öğrenecekler. Kitabın kaynak kodu Java 8'de yazılmıştır ve Java 8 lambda ifadesi sözdizimini önemli ölçüde kullanır; dolayısıyla lambda işlevleriyle nasıl çalışılacağını bilmek (başka bir programlama dilinde bile) faydalı olacaktır.

Alıntı. 5.3. Toplama ve pencereleme işlemleri

Bu bölümde Kafka Akımlarının en umut verici kısımlarını keşfetmeye devam edeceğiz. Şu ana kadar Kafka Akımlarının aşağıdaki yönlerini ele aldık:

  • bir işleme topolojisi oluşturma;
  • akış uygulamalarında durumun kullanılması;
  • veri akışı bağlantılarının gerçekleştirilmesi;
  • olay akışları (KStream) ile güncelleme akışları (KTable) arasındaki farklar.

Aşağıdaki örneklerde tüm bu unsurları bir araya getireceğiz. Ayrıca akış uygulamalarının bir başka harika özelliği olan pencereleme hakkında da bilgi edineceksiniz. İlk örneğimiz basit bir toplama olacak.

5.3.1. Hisse satışlarının sanayi sektörüne göre toplanması

Toplama ve gruplama, akış verileriyle çalışırken hayati önem taşıyan araçlardır. Bireysel kayıtların alındıkları anda incelenmesi çoğu zaman yetersizdir. Verilerden ek bilgi elde etmek için bunları gruplandırmak ve birleştirmek gerekir.

Bu örnekte, çeşitli sektörlerdeki şirketlerin hisse senetlerinin satış hacmini takip etmesi gereken bir günlük tüccarın kostümünü giyeceksiniz. Özellikle, her sektörde en büyük hisse satışına sahip beş şirketle ilgileniyorsunuz.

Bu tür bir toplama, verileri istenen forma dönüştürmek için (genel anlamda konuşursak) aşağıdaki birkaç adımı gerektirecektir.

  1. Ham hisse senedi alım satım bilgilerini yayınlayan konuya dayalı bir kaynak oluşturun. StockTransaction türündeki bir nesneyi ShareVolume türündeki bir nesneyle eşlememiz gerekecek. Mesele şu ki, StockTransaction nesnesi satış meta verilerini içeriyor, ancak bizim yalnızca satılan hisselerin sayısıyla ilgili verilere ihtiyacımız var.
  2. ShareVolume verilerini hisse senedi sembolüne göre gruplandırın. Sembole göre gruplandırıldıktan sonra bu verileri stok satış hacimlerinin alt toplamlarına daraltabilirsiniz. KStream.groupBy yönteminin KGroupedStream türünün bir örneğini döndürdüğünü belirtmekte fayda var. Ayrıca KGroupedStream.reduce yöntemini çağırarak bir KTable örneği alabilirsiniz.

KGroupedStream arayüzü nedir?

KStream.groupBy ve KStream.groupByKey yöntemleri, KGroupedStream'in bir örneğini döndürür. KGroupedStream, anahtarlara göre gruplandırıldıktan sonra bir olay akışının ara temsilidir. Onunla doğrudan çalışmak için tasarlanmamıştır. Bunun yerine, her zaman bir KTable ile sonuçlanan toplama işlemleri için KGroupedStream kullanılır. Toplama işlemlerinin sonucu bir KTable olduğundan ve bir durum deposu kullandıklarından, sonuç olarak tüm güncellemelerin işlem hattının daha aşağılarına gönderilmemesi mümkündür.

KTable.groupBy yöntemi, anahtara göre yeniden gruplandırılmış güncelleme akışının ara temsili olan benzer bir KGroupedTable döndürür.

Kısa bir ara verelim ve Şekil 5.9'ye bakalım. XNUMX, neleri başardığımızı gösteriyor. Bu topoloji size zaten çok tanıdık geliyor olmalı.

“Kafka Akımları İş Başında” kitabı. Gerçek zamanlı çalışmaya yönelik uygulamalar ve mikro hizmetler"
Şimdi bu topolojinin koduna bakalım (src/main/Java/bbejeck/chapter_5/AggregationsAndReducingExample.java dosyasında bulunabilir) (Liste 5.2).

“Kafka Akımları İş Başında” kitabı. Gerçek zamanlı çalışmaya yönelik uygulamalar ve mikro hizmetler"
Verilen kod, kısalığı ve birkaç satırda gerçekleştirilen çok sayıda eylem ile ayırt edilir. builder.stream yönteminin ilk parametresinde yeni bir şey fark edebilirsiniz: Consumed.withOffsetResetPolicy yöntemi kullanılarak ayarlanan AutoOffsetReset.EARLIEST enum türünün bir değeri (ayrıca bir LATEST de vardır). Bu numaralandırma türü, her KStream veya KTable için bir ofset sıfırlama stratejisi belirlemek için kullanılabilir ve konfigürasyondaki ofset sıfırlama seçeneğine göre önceliklidir.

GroupByKey ve GroupBy

KStream arayüzünün kayıtları gruplamak için iki yöntemi vardır: GroupByKey ve GroupBy. Her ikisi de bir KGroupedTable döndürür; dolayısıyla aralarındaki farkın ne olduğunu ve hangisinin ne zaman kullanılacağını merak ediyor olabilirsiniz.

GroupByKey yöntemi, KStream'deki anahtarlar zaten boş olmadığında kullanılır. Ve en önemlisi, "yeniden bölümleme gerektirir" bayrağı hiçbir zaman ayarlanmadı.

GroupBy yöntemi, gruplandırma anahtarlarını değiştirdiğinizi ve dolayısıyla yeniden bölümleme bayrağının true olarak ayarlandığını varsayar. GroupBy yönteminden sonra birleştirme, toplama vb. gerçekleştirmek, otomatik olarak yeniden bölümlendirmeyle sonuçlanacaktır.
Özet: Mümkün olduğunda GroupBy yerine GroupByKey'i kullanmalısınız.

MapValues ​​ve groupBy yöntemlerinin ne yaptığı açık, o yüzden sum() yöntemine bir göz atalım (src/main/java/bbejeck/model/ShareVolume.java'da bulunur) (Liste 5.3).

“Kafka Akımları İş Başında” kitabı. Gerçek zamanlı çalışmaya yönelik uygulamalar ve mikro hizmetler"
ShareVolume.sum yöntemi, hisse senedi satış hacminin cari toplamını döndürür ve tüm hesaplama zincirinin sonucu bir KTable nesnesidir. . Artık KTable'ın oynadığı rolü anlıyorsunuz. ShareVolume nesneleri geldiğinde, karşılık gelen KTable nesnesi en son güncel güncellemeyi saklar. Tüm güncellemelerin önceki shareVolumeKTable'a yansıtıldığını ancak hepsinin daha ileri gönderilmediğini unutmamak önemlidir.

Daha sonra bu KTtabloyu, her sektörde en yüksek hisse senedi hacmine sahip beş şirkete ulaşmak için (işlem gören hisse sayısına göre) toplamak için kullanırız. Bu durumda eylemlerimiz ilk toplamadakilere benzer olacaktır.

  1. Bireysel ShareVolume nesnelerini sektöre göre gruplandırmak için başka bir groupBy işlemi gerçekleştirin.
  2. ShareVolume nesnelerini özetlemeye başlayın. Bu sefer toplama nesnesi sabit boyutlu bir öncelik sırasıdır. Bu sabit büyüklükteki kuyrukta yalnızca en fazla hisse satışına sahip beş şirket tutulur.
  3. Önceki paragraftaki kuyrukları bir dize değeriyle eşleştirin ve sektöre göre en çok işlem gören ilk beş hisse senedini döndürün.
  4. Sonuçları string biçiminde konuya yazın.

İncirde. Şekil 5.10 veri akışı topolojisi grafiğini göstermektedir. Gördüğünüz gibi ikinci işlem turu oldukça basittir.

“Kafka Akımları İş Başında” kitabı. Gerçek zamanlı çalışmaya yönelik uygulamalar ve mikro hizmetler"
Artık bu ikinci işlem turunun yapısını net bir şekilde anladığımıza göre kaynak koduna dönebiliriz (bunu src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java dosyasında bulacaksınız) (Liste 5.4) .

Bu başlatıcı bir sabitQueue değişkeni içerir. Bu, ticareti yapılan hisselerin azalan sırasına göre ilk N sonuçlarını izlemek için kullanılan Java.util.TreeSet için bir bağdaştırıcı olan özel bir nesnedir.

“Kafka Akımları İş Başında” kitabı. Gerçek zamanlı çalışmaya yönelik uygulamalar ve mikro hizmetler"
GroupBy ve mapValues ​​çağrılarını zaten gördünüz, bu yüzden bunlara girmeyeceğiz (KTable.print yöntemi kullanımdan kaldırıldığı için KTable.toStream yöntemini çağırıyoruz). Ama henüzgregat()'ın KTable versiyonunu görmediniz, bu yüzden bunu tartışmak için biraz zaman harcayacağız.

Hatırlayacağınız gibi KTable'ı farklı kılan şey, aynı anahtarlara sahip kayıtların güncelleme olarak değerlendirilmesidir. KTable eski girişi yenisiyle değiştirir. Toplama da benzer şekilde gerçekleşir: Aynı anahtara sahip en son kayıtlar toplanır. Bir kayıt geldiğinde, bir toplayıcı (toplu yöntem çağrısındaki ikinci parametre) kullanılarak DifferentSizePriorityQueue sınıfı örneğine eklenir, ancak aynı anahtara sahip başka bir kayıt zaten mevcutsa eski kayıt bir çıkarıcı kullanılarak kaldırılır (üçüncü parametre) toplu yöntem çağrısı).

Bütün bunlar, toplayıcımız olan FixSizePriorityQueue'nun tüm değerleri tek bir anahtarda toplamadığı, ancak en çok işlem gören N hisse senedi türünün miktarlarının hareketli bir toplamını sakladığı anlamına gelir. Gelen her giriş şu ana kadar satılan toplam hisse sayısını içerir. KTable, her güncellemenin sürekli olarak toplanmasına gerek kalmadan size hangi şirketlerin hisselerinin şu anda en çok işlem gördüğü hakkında bilgi verecektir.

İki önemli şeyi yapmayı öğrendik:

  • KTable'daki değerleri ortak bir anahtarla gruplayın;
  • bu gruplandırılmış değerler üzerinde toplama ve toplama gibi faydalı işlemleri gerçekleştirin.

Bu işlemlerin nasıl gerçekleştirileceğini bilmek, Kafka Streams uygulaması üzerinden geçen verilerin anlamını anlamak ve hangi bilgileri taşıdığını anlamak açısından önemlidir.

Ayrıca bu kitapta daha önce tartışılan bazı temel kavramları bir araya getirdik. 4. Bölümde hataya dayanıklı yerel durumun bir akış uygulaması için ne kadar önemli olduğunu tartıştık. Bu bölümdeki ilk örnek, yerel durumun neden bu kadar önemli olduğunu gösterdi; size daha önce gördüğünüz bilgilerin izini sürme yeteneği verir. Yerel erişim, ağ gecikmelerini önleyerek uygulamayı daha performanslı ve hatalara karşı dayanıklı hale getirir.

Herhangi bir toplama veya toplama işlemi gerçekleştirirken durum deposunun adını belirtmeniz gerekir. Toplama ve toplama işlemleri bir KTable örneği döndürür ve KTable, eski sonuçları yenileriyle değiştirmek için durum depolamayı kullanır. Gördüğünüz gibi, tüm güncellemeler ardı ardına gönderilmez ve bu önemlidir çünkü toplama işlemleri özet bilgiler üretmek üzere tasarlanmıştır. Yerel durumu uygulamazsanız, KTable tüm toplama ve toplama sonuçlarını iletecektir.

Daha sonra, pencereleme işlemleri adı verilen toplama gibi işlemleri belirli bir süre içinde gerçekleştirmeye bakacağız.

5.3.2. Pencere işlemleri

Önceki bölümde kayan evrişimi ve toplamayı tanıttık. Uygulama, hisse senedi satış hacminin sürekli olarak toplanmasını ve ardından borsada en çok işlem gören beş hisse senedinin toplanmasını gerçekleştirdi.

Bazen sonuçların bu şekilde sürekli olarak toplanması ve özetlenmesi gerekli olabilir. Bazen işlemleri yalnızca belirli bir süre boyunca gerçekleştirmeniz gerekir. Örneğin, son 10 dakika içinde belirli bir şirketin hisseleriyle kaç tane takas işlemi yapıldığını hesaplayın. Veya son 15 dakika içinde kaç kullanıcının yeni bir reklam bannerına tıkladığını. Bir uygulama bu tür işlemleri birden çok kez gerçekleştirebilir, ancak sonuçları yalnızca belirli zaman dilimleri (zaman pencereleri) için geçerli olur.

Takas işlemlerinin alıcıya göre sayılması

Bir sonraki örnekte, ister büyük kuruluşlar ister akıllı bireysel finansörler olsun, birden fazla yatırımcı arasındaki hisse senedi işlemlerini takip edeceğiz.

Bu izlemenin iki olası nedeni vardır. Bunlardan biri, pazar liderlerinin ne alıp sattığını bilme ihtiyacıdır. Bu büyük oyuncular ve bilgili yatırımcılar fırsatı görürlerse onların stratejisini takip etmek mantıklı olacaktır. İkinci sebep, yasa dışı içeriden bilgi ticaretinin olası işaretlerini tespit etme arzusudur. Bunu yapmak için, büyük satış artışlarının önemli basın bültenleriyle ilişkisini analiz etmeniz gerekecek.

Bu tür izleme aşağıdaki adımlardan oluşur:

  • hisse senedi işlemleri konusundan okumak için bir akış oluşturmak;
  • Gelen kayıtları alıcı kimliğine ve hisse senedi sembolüne göre gruplama. groupBy yönteminin çağrılması, KGroupedStream sınıfının bir örneğini döndürür;
  • KGroupedStream.windowedBy yöntemi, pencereli toplamaya izin veren bir zaman penceresiyle sınırlı bir veri akışı döndürür. Pencere türüne bağlı olarak TimeWindowedKStream veya SessionWindowedKStream döndürülür;
  • toplama işlemi için işlem sayısı. Pencereli veri akışı, bu sayımda belirli bir kaydın dikkate alınıp alınmayacağını belirler;
  • Sonuçların bir konuya yazılması veya geliştirme sırasında bunların konsola gönderilmesi.

Bu uygulamanın topolojisi basittir ancak net bir resmi faydalı olacaktır. Şekil 5.11'ye bir göz atalım. XNUMX.

Daha sonra pencere işlemlerinin işlevselliğine ve ilgili koda bakacağız.

“Kafka Akımları İş Başında” kitabı. Gerçek zamanlı çalışmaya yönelik uygulamalar ve mikro hizmetler"

Pencere türleri

Kafka Akışlarında üç tür pencere vardır:

  • oturumlu;
  • “takla atma” (takla atma);
  • kaymak/atlamak.

Hangisini seçeceğiniz iş gereksinimlerinize bağlıdır. Yuvarlanan ve atlayan pencereler zamanla sınırlıdır, oturum pencereleri ise kullanıcı etkinliğine göre sınırlıdır; oturum(lar)ın süresi yalnızca kullanıcının ne kadar aktif olduğuna göre belirlenir. Hatırlanması gereken en önemli şey, tüm pencere türlerinin sistem saatine değil, girişlerin tarih/saat damgalarına dayalı olmasıdır.

Daha sonra topolojimizi her pencere tipiyle uyguluyoruz. Kodun tamamı yalnızca ilk örnekte verilecektir; diğer pencere türleri için, pencere işleminin türü dışında hiçbir şey değişmeyecektir.

Oturum pencereleri

Oturum pencereleri diğer tüm pencere türlerinden çok farklıdır. Bunlar zamanla sınırlı değildir, kullanıcının etkinliği (veya izlemek istediğiniz varlığın etkinliği) ile sınırlıdır. Oturum pencereleri etkin olmama dönemleriyle sınırlandırılır.

Şekil 5.12 oturum pencereleri kavramını göstermektedir. Daha küçük olan oturum solundaki oturumla birleşecektir. Sağdaki oturum ise ayrı olacak çünkü uzun bir hareketsizlik döneminin ardından geliyor. Oturum pencereleri kullanıcı etkinliğine dayalıdır ancak girişin hangi oturuma ait olduğunu belirlemek için girişlerdeki tarih/saat damgalarını kullanır.

“Kafka Akımları İş Başında” kitabı. Gerçek zamanlı çalışmaya yönelik uygulamalar ve mikro hizmetler"

Hisse senedi işlemlerini takip etmek için oturum pencerelerini kullanma

Değişim işlemleriyle ilgili bilgileri yakalamak için oturum pencerelerini kullanalım. Oturum pencerelerinin uygulanması Liste 5.5'te gösterilmektedir (bu, src/main/Java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java'da bulunabilir).

“Kafka Akımları İş Başında” kitabı. Gerçek zamanlı çalışmaya yönelik uygulamalar ve mikro hizmetler"
Bu topolojideki işlemlerin çoğunu zaten gördünüz, bu yüzden onlara burada tekrar bakmanıza gerek yok. Ancak burada şimdi tartışacağımız birçok yeni unsur da var.

Herhangi bir groupBy işlemi genellikle bir tür toplama işlemi (toplama, toplama veya sayma) gerçekleştirir. Değişen toplamla kümülatif toplama veya belirli bir zaman penceresi içindeki kayıtları hesaba katan pencere toplama işlemini gerçekleştirebilirsiniz.

Liste 5.5'teki kod, oturum pencereleri içindeki işlem sayısını sayar. İncirde. 5.13 bu eylemler adım adım analiz edilir.

windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) öğesini çağırarak, hareketsizlik aralığı 20 saniye ve kalıcılık aralığı 15 dakika olan bir oturum penceresi oluşturuyoruz. 20 saniyelik boşta kalma aralığı, uygulamanın, mevcut oturumun bitiminden veya başlangıcından itibaren 20 saniye içinde gelen herhangi bir girişi mevcut (aktif) oturuma dahil edeceği anlamına gelir.

“Kafka Akımları İş Başında” kitabı. Gerçek zamanlı çalışmaya yönelik uygulamalar ve mikro hizmetler"
Daha sonra, oturum penceresinde hangi toplama işleminin gerçekleştirilmesi gerektiğini belirtiriz - bu durumda sayın. Gelen bir giriş hareketsizlik penceresinin (tarih/saat damgasının her iki tarafında) dışında kalırsa, uygulama yeni bir oturum oluşturur. Saklama aralığı, bir oturumu belirli bir süre boyunca sürdürmek anlamına gelir ve oturumun etkinlik dışı kalma süresinin ötesine geçen ancak yine de eklenebilen geç verilere izin verir. Ayrıca, birleştirme sonucunda ortaya çıkan yeni oturumun başlangıç ​​ve bitiş tarihleri, en erken ve en geç tarih/saat damgasına karşılık gelir.

Oturumların nasıl çalıştığını görmek için count yönteminden birkaç girişe bakalım (Tablo 5.1).

“Kafka Akımları İş Başında” kitabı. Gerçek zamanlı çalışmaya yönelik uygulamalar ve mikro hizmetler"
Kayıtlar geldiğinde, aynı anahtara sahip, bitiş zamanı geçerli tarih/saat damgası - eylemsizlik aralığından daha küçük olan ve başlangıç ​​zamanı geçerli tarih/saat damgası + eylemsizlik aralığından daha büyük olan mevcut oturumları ararız. Bunu dikkate alarak tablodan dört giriş. 5.1 aşağıdaki gibi tek bir oturumda birleştirilir.

1. Kayıt 1 önce gelir, dolayısıyla başlangıç ​​zamanı bitiş zamanına eşit olur ve 00:00:00 olur.

2. Daha sonra giriş 2 gelir ve 23:59:55'ten önce bitmeyen ve 00:00:35'ten geç olmayan oturumları ararız. Kayıt 1'i buluyoruz ve 1. ve 2. oturumları birleştiriyoruz. 1. oturumun başlangıç ​​saatini (daha erken) ve 2. oturumun bitiş saatini (daha sonra) alıyoruz, böylece yeni oturumumuz 00:00:00'da başlayıp 00'da bitiyor: 00:15.

3. Kayıt 3 geldiğinde 00:00:30 ile 00:01:10 arasındaki seansları arıyoruz ve bulamıyoruz. 123-345-654,FFBE anahtarı için 00:00:50'de başlayıp biten ikinci bir oturum ekleyin.

4. Kayıt 4 geliyor ve 23:59:45 ile 00:00:25 arasındaki seansları arıyoruz. Bu kez hem 1. hem de 2. oturum bulunur. Üç oturumun tümü, başlangıç ​​zamanı 00:00:00 ve bitiş zamanı 00:00:15 olan tek bir oturumda birleştirilir.

Bu bölümde anlatılanlardan aşağıdaki önemli nüansları hatırlamakta fayda var:

  • oturumlar sabit boyutlu pencereler değildir. Bir oturumun süresi, belirli bir süre içindeki aktiviteye göre belirlenir;
  • Verilerdeki tarih/saat damgaları, olayın mevcut bir oturuma mı yoksa boşta kalan bir döneme mi denk geldiğini belirler.

Daha sonra bir sonraki pencere tipini tartışacağız: "taklalı" pencereler.

"Yuvarlanan" pencereler

Yuvarlanan pencereler, belirli bir zaman dilimine denk gelen olayları yakalar. Belirli bir şirketin tüm hisse senedi işlemlerini her 20 saniyede bir kaydetmeniz gerektiğini, böylece o zaman dilimindeki tüm olayları topladığınızı düşünün. 20 saniyelik aralığın sonunda pencere döner ve 20 saniyelik yeni bir gözlem aralığına geçer. Şekil 5.14 bu durumu göstermektedir.

“Kafka Akımları İş Başında” kitabı. Gerçek zamanlı çalışmaya yönelik uygulamalar ve mikro hizmetler"
Gördüğünüz gibi son 20 saniyede alınan tüm olaylar pencerede yer alıyor. Bu sürenin sonunda yeni bir pencere oluşturulur.

Liste 5.6, her 20 saniyede bir hisse senedi işlemlerini yakalamak için dönen pencerelerin kullanımını gösteren kodu göstermektedir (src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java'da bulunur).

“Kafka Akımları İş Başında” kitabı. Gerçek zamanlı çalışmaya yönelik uygulamalar ve mikro hizmetler"
TimeWindows.of yöntem çağrısında yapılan bu küçük değişiklikle, yuvarlanan bir pencere kullanabilirsiniz. Bu örnek, Until() yöntemini çağırmadığından, 24 saatlik varsayılan saklama aralığı kullanılacaktır.

Son olarak, pencere seçeneklerinin sonuncusu olan "atlamalı" pencerelere geçmenin zamanı geldi.

Kayar ("atlama") pencereler

Sürgülü/atlamalı pencereler, devrilen pencerelere benzer, ancak küçük bir farkla. Sürgülü pencereler, son olayları işlemek için yeni bir pencere oluşturmadan önce zaman aralığının sonuna kadar beklemez. Pencere süresinden daha kısa bir bekleme süresinden sonra yeni hesaplamalara başlarlar.

Düşen ve sıçrayan pencereler arasındaki farkları göstermek için borsa işlemlerini sayma örneğine dönelim. Amacımız hâlâ işlem sayısını saymaktır ancak sayacı güncellemeden önce tüm süreyi beklemek istemiyoruz. Bunun yerine sayacı daha kısa aralıklarla güncelleyeceğiz. Örneğin, yine her 20 saniyede bir işlem sayısını sayacağız ancak sayacı Şekil 5'de gösterildiği gibi her 5.15 saniyede bir güncelleyeceğiz. XNUMX. Bu durumda, örtüşen verilere sahip üç sonuç penceresi elde ederiz.

“Kafka Akımları İş Başında” kitabı. Gerçek zamanlı çalışmaya yönelik uygulamalar ve mikro hizmetler"
Liste 5.7, kayan pencereleri tanımlama kodunu gösterir (src/main/Java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java'da bulunur).

“Kafka Akımları İş Başında” kitabı. Gerçek zamanlı çalışmaya yönelik uygulamalar ve mikro hizmetler"
Bir yuvarlanan pencere, AdvanceBy() yöntemine bir çağrı eklenerek atlamalı bir pencereye dönüştürülebilir. Gösterilen örnekte kaydetme aralığı 15 dakikadır.

Bu bölümde toplama sonuçlarının zaman pencereleriyle nasıl sınırlandırılacağını gördünüz. Bu bölümden özellikle şu üç şeyi hatırlamanızı istiyorum:

  • oturum pencerelerinin boyutu zaman aralığına göre değil kullanıcı etkinliğine göre sınırlıdır;
  • "Dönen" pencereler, belirli bir zaman dilimi içindeki olaylara genel bir bakış sağlar;
  • Atlama pencerelerinin süresi sabittir ancak sıklıkla güncellenirler ve tüm pencerelerde çakışan girişler içerebilirler.

Daha sonra, bir bağlantı için KTable'ı tekrar KStream'e nasıl dönüştüreceğimizi öğreneceğiz.

5.3.3. KStream ve KTable nesnelerini bağlama

Bölüm 4'te iki KStream nesnesini bağlamayı tartıştık. Şimdi KTable ve KStream'i nasıl bağlayacağımızı öğrenmeliyiz. Buna aşağıdaki basit nedenden dolayı ihtiyaç duyulabilir. KStream bir kayıt akışıdır ve KTable bir kayıt güncellemeleri akışıdır, ancak bazen KTable'dan gelen güncellemeleri kullanarak kayıt akışına ek bağlam eklemek isteyebilirsiniz.

Borsa işlem sayılarına ilişkin verileri alıp ilgili sektörlere ait borsa haberleriyle birleştirelim. Zaten sahip olduğunuz kod göz önüne alındığında bunu başarmak için yapmanız gerekenler aşağıda açıklanmıştır.

  1. Hisse senedi işlemlerinin sayısına ilişkin verileri içeren bir KTable nesnesini bir KStream'e dönüştürün ve ardından anahtarı, bu hisse senedi sembolüne karşılık gelen endüstri sektörünü belirten anahtarla değiştirin.
  2. Borsa haberleri içeren bir konudaki verileri okuyan bir KTable nesnesi oluşturun. Bu yeni KTable endüstri sektörüne göre kategorize edilecek.
  3. Haber güncellemelerini endüstri sektörüne göre borsa işlemlerinin sayısına ilişkin bilgilerle birleştirin.

Şimdi bu eylem planının nasıl uygulanacağına bakalım.

KTable'ı KStream'e dönüştürün

KTable'ı KStream'e dönüştürmek için aşağıdakileri yapmanız gerekir.

  1. KTable.toStream() yöntemini çağırın.
  2. KStream.map yöntemini çağırarak, anahtarı sektör adıyla değiştirin ve ardından TransactionSummary nesnesini Windowed örneğinden alın.

Bu işlemleri aşağıdaki gibi birbirine zincirleyeceğiz (kod src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java dosyasında bulunabilir) (Liste 5.8).

“Kafka Akımları İş Başında” kitabı. Gerçek zamanlı çalışmaya yönelik uygulamalar ve mikro hizmetler"
Bir KStream.map işlemi gerçekleştirdiğimiz için, döndürülen KStream örneği bir bağlantıda kullanıldığında otomatik olarak yeniden bölümlendirilir.

Dönüştürme işlemini tamamladık, ardından hisse senedi haberlerini okumak için bir KTable nesnesi oluşturmamız gerekiyor.

Hisse senedi haberleri için KTable'ın oluşturulması

Neyse ki, bir KTable nesnesi oluşturmak yalnızca bir satır kod gerektirir (kod src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java'da bulunabilir) (Liste 5.9).

“Kafka Akımları İş Başında” kitabı. Gerçek zamanlı çalışmaya yönelik uygulamalar ve mikro hizmetler"
Ayarlarda Serdes dizisi kullanıldığından, hiçbir Serde nesnesinin belirtilmesine gerek olmadığını belirtmekte fayda var. Ayrıca EARLIEST numaralandırması kullanılarak tablonun en başında kayıtlar doldurulur.

Artık son adım olan bağlantıya geçebiliriz.

Haber güncellemelerini işlem sayısı verileriyle bağlama

Bağlantı oluşturmak zor değil. İlgili sektöre ait hisse senedi haberi olmaması durumunda sol birleştirmeyi kullanacağız (gerekli kod src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java dosyasında bulunabilir) (Liste 5.10).

“Kafka Akımları İş Başında” kitabı. Gerçek zamanlı çalışmaya yönelik uygulamalar ve mikro hizmetler"
Bu leftJoin operatörü oldukça basittir. Bölüm 4'teki birleştirmelerden farklı olarak, KStream-KTable birleşimi gerçekleştirilirken KTable'da her anahtar için yalnızca bir giriş olduğundan, joinWindow yöntemi kullanılmaz. Böyle bir bağlantı zamanla sınırlı değildir: kayıt ya KTable'dadır ya da yoktur. Ana sonuç: KTable nesnelerini kullanarak KStream'i daha az güncellenen referans verileriyle zenginleştirebilirsiniz.

Şimdi KStream'deki etkinlikleri zenginleştirmenin daha etkili bir yoluna bakacağız.

5.3.4. GlobalKTable nesneleri

Gördüğünüz gibi etkinlik akışlarını zenginleştirmeye veya onlara bağlam eklemeye ihtiyaç var. Bölüm 4'te iki KStream nesnesi arasındaki bağlantıları gördünüz ve önceki bölümde KStream ile KTable arasındaki bağlantıyı gördünüz. Tüm bu durumlarda, anahtarları yeni bir türe veya değere eşlerken veri akışını yeniden bölümlemek gerekir. Yeniden bölümlendirme bazen açıkça yapılır, bazen de Kafka Streams bunu otomatik olarak yapar. Anahtarlar değiştiğinden ve kayıtların yeni bölümlerde sonlanması gerektiğinden yeniden bölümleme gereklidir, aksi takdirde bağlantı imkansız olacaktır (bu, Bölüm 4'te, 4.2.4 alt bölümündeki “Verilerin yeniden bölümlenmesi” bölümünde tartışılmıştır).

Yeniden bölümlemenin bir maliyeti vardır

Yeniden bölümleme maliyet gerektirir - ara konuların oluşturulması, yinelenen verilerin başka bir konuda saklanması için ek kaynak maliyetleri; bu aynı zamanda bu konuya yazma ve okuma nedeniyle artan gecikme anlamına da gelir. Ayrıca, birden fazla boyut veya boyutta birleştirme yapmanız gerekiyorsa birleştirmeleri zincirlemeniz, kayıtları yeni anahtarlarla eşlemeniz ve yeniden bölümlendirme işlemini yeniden çalıştırmanız gerekir.

Daha küçük veri kümelerine bağlanma

Bazı durumlarda bağlanacak referans verilerinin hacmi nispeten küçüktür, bu nedenle tam kopyaları her düğüme yerel olarak kolayca sığabilir. Bu gibi durumlar için Kafka Streams GlobalKTable sınıfını sağlar.

GlobalKTable örnekleri benzersizdir çünkü uygulama tüm verileri düğümlerin her birine kopyalar. Ve tüm veriler her düğümde mevcut olduğundan, olay akışının tüm bölümler tarafından kullanılabilmesi için referans veri anahtarına göre bölümlenmesine gerek yoktur. Ayrıca GlobalKTable nesnelerini kullanarak anahtarsız birleştirmeler de yapabilirsiniz. Bu özelliği göstermek için önceki örneklerden birine geri dönelim.

KStream nesnelerini GlobalKTable nesnelerine bağlama

Alt bölüm 5.3.2'de, alıcılar tarafından takas işlemlerinin pencere birleştirmesini gerçekleştirdik. Bu toplamanın sonuçları şuna benziyordu:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Bu sonuçlar amacımıza hizmet etse de müşterinin adı ve tam şirket adının da görüntülenmesi daha faydalı olurdu. Müşteri adını ve şirket adını eklemek için normal birleştirmeler yapabilirsiniz ancak iki önemli eşleme ve yeniden bölümleme yapmanız gerekecektir. GlobalKTable ile bu tür operasyonların maliyetinden kurtulabilirsiniz.

Bunu yapmak için Liste 5.11'deki countStream nesnesini kullanacağız (ilgili kod src/main/Java/bbejeck/chapter_5/GlobalKTableExample.java'da bulunabilir) ve onu iki GlobalKTable nesnesine bağlayacağız.

“Kafka Akımları İş Başında” kitabı. Gerçek zamanlı çalışmaya yönelik uygulamalar ve mikro hizmetler"
Bunu daha önce de tartışmıştık, o yüzden tekrarlamayacağım. Ancak toStream().map işlevindeki kodun, okunabilirlik açısından satır içi lambda ifadesi yerine bir işlev nesnesine soyutlandığını belirtmek isterim.

Bir sonraki adım GlobalKTable'ın iki örneğini bildirmektir (gösterilen kod src/main/Java/bbejeck/chapter_5/GlobalKTableExample.java dosyasında bulunabilir) (Liste 5.12).

“Kafka Akımları İş Başında” kitabı. Gerçek zamanlı çalışmaya yönelik uygulamalar ve mikro hizmetler"

Lütfen konu adlarının numaralandırılmış türler kullanılarak tanımlandığını unutmayın.

Artık tüm bileşenler hazır olduğuna göre geriye kalan tek şey bağlantı kodunu yazmaktır (bu kod src/main/Java/bbejeck/chapter_5/GlobalKTableExample.java dosyasında bulunabilir) (Liste 5.13).

“Kafka Akımları İş Başında” kitabı. Gerçek zamanlı çalışmaya yönelik uygulamalar ve mikro hizmetler"
Bu kodda iki birleşim olmasına rağmen sonuçlarının hiçbiri ayrı ayrı kullanılmadığından zincirlenirler. Sonuçlar tüm işlemin sonunda görüntülenir.

Yukarıdaki birleştirme işlemini çalıştırdığınızda aşağıdaki gibi sonuçlar alırsınız:

{customer='Barney, Smith' company="Exxon", transactions= 17}

İşin özü değişmedi ama bu sonuçlar daha net görünüyor.

Bölüm 4'e kadar geri sayarsanız, zaten çeşitli bağlantı türlerini çalışırken görmüşsünüzdür. Bunlar tabloda listelenmiştir. 5.2. Bu tablo, Kafka Akışlarının 1.0.0 sürümünden itibaren bağlantı yeteneklerini yansıtmaktadır; Gelecek sürümlerde bir şeyler değişebilir.

“Kafka Akımları İş Başında” kitabı. Gerçek zamanlı çalışmaya yönelik uygulamalar ve mikro hizmetler"
Konuyu toparlamak için temelleri özetleyelim: Yerel durumu kullanarak olay akışlarını (KStream) bağlayabilir ve akışları (KTable) güncelleyebilirsiniz. Alternatif olarak, referans verilerinin boyutu çok büyük değilse GlobalKTable nesnesini kullanabilirsiniz. GlobalKTables, tüm bölümleri her Kafka Streams uygulama düğümüne kopyalayarak, anahtarın hangi bölüme karşılık geldiğine bakılmaksızın tüm verilerin kullanılabilir olmasını sağlar.

Daha sonra, Kafka konusundan veri tüketmeden durum değişikliklerini gözlemleyebileceğimiz Kafka Akışları özelliğini göreceğiz.

5.3.5. Sorgulanabilir durum

Durumu içeren çeşitli işlemleri zaten gerçekleştirdik ve sonuçları her zaman konsola (geliştirme amacıyla) çıktı olarak verdik veya bunları bir konuya yazdık (üretim amacıyla). Sonuçları bir konuya yazarken, bunları görüntülemek için Kafka tüketicisini kullanmanız gerekir.

Bu konulardan veri okumak, bir tür somutlaştırılmış görünüm olarak düşünülebilir. Amaçlarımız açısından, Wikipedia'daki somutlaştırılmış görünüm tanımını kullanabiliriz: “...bir sorgunun sonuçlarını içeren fiziksel bir veritabanı nesnesi. Örneğin, uzak verilerin yerel bir kopyası veya bir tablonun veya birleştirme sonuçlarının satır ve/veya sütunlarının bir alt kümesi veya toplama yoluyla elde edilen bir özet tablo olabilir” (https://en.wikipedia.org/wiki) /Materialized_view).

Kafka Streams ayrıca durum depolarında etkileşimli sorgular çalıştırmanıza olanak tanıyarak bu somutlaştırılmış görünümleri doğrudan okumanıza olanak tanır. Durum deposuna yapılan sorgunun salt okunur bir işlem olduğuna dikkat etmek önemlidir. Bu, uygulamanız verileri işlerken yanlışlıkla durumun tutarsız hale gelmesi konusunda endişelenmenize gerek kalmamasını sağlar.

Durum depolarını doğrudan sorgulama yeteneği önemlidir. Bu, ilk önce Kafka tüketicisinden veri almak zorunda kalmadan kontrol paneli uygulamaları oluşturabileceğiniz anlamına gelir. Ayrıca tekrar veri yazmaya gerek kalmaması nedeniyle uygulamanın verimliliği de artar:

  • verilerin yerelliği sayesinde bunlara hızlı bir şekilde erişilebilir;
  • harici depolama birimine yazılmadığından verilerin kopyalanması ortadan kaldırılır.

Hatırlamanızı istediğim en önemli şey, uygulamanızın içinden doğrudan durumu sorgulayabilmenizdir. Bunun size sağladığı fırsatlar abartılamaz. Uygulama için Kafka'dan veri tüketmek ve kayıtları bir veritabanında depolamak yerine durum depolarını aynı sonuçla sorgulayabilirsiniz. Durum depolarına doğrudan sorgulama, daha az kod (tüketici yok) ve daha az yazılım (sonuçları saklamak için bir veritabanı tablosuna gerek yok) anlamına gelir.

Bu bölümde epeyce yol kat ettik, dolayısıyla devlet mağazalarına yönelik etkileşimli sorgu tartışmamızı şimdilik bırakacağız. Ancak endişelenmeyin: Bölüm 9'da etkileşimli sorgular içeren basit bir kontrol paneli uygulaması oluşturacağız. Etkileşimli sorguları ve bunları Kafka Streams uygulamalarına nasıl ekleyebileceğinizi göstermek için bu ve önceki bölümlerdeki bazı örnekleri kullanacağız.

Özet

  • KStream nesneleri, bir veritabanına eklenenlerle karşılaştırılabilecek olay akışlarını temsil eder. KTable nesneleri, daha çok bir veritabanındaki güncellemelere benzer şekilde güncelleme akışlarını temsil eder. KTable nesnesinin boyutu artmıyor, eski kayıtların yerini yenileri alıyor.
  • Toplama işlemleri için KTable nesneleri gereklidir.
  • Pencereleme işlemlerini kullanarak, birleştirilmiş verileri zaman dilimlerine bölebilirsiniz.
  • GlobalKTable nesneleri sayesinde referans verilerine bölümlemeden bağımsız olarak uygulamanın herhangi bir yerinden erişebilirsiniz.
  • KStream, KTable ve GlobalKTable nesneleri arasındaki bağlantılar mümkündür.

Şu ana kadar üst düzey KStream DSL'yi kullanarak Kafka Streams uygulamaları oluşturmaya odaklandık. Üst düzey yaklaşım, düzenli ve özlü programlar oluşturmanıza olanak tanısa da, bunu kullanmak bir ödünleşimi temsil eder. DSL KStream ile çalışmak, kontrol derecesini azaltarak kodunuzun özlülüğünü artırmak anlamına gelir. Bir sonraki bölümde, düşük seviyeli işleyici düğüm API'sine bakacağız ve diğer dengelemeleri deneyeceğiz. Programlar eskisinden daha uzun olacak ancak ihtiyaç duyabileceğimiz hemen hemen her işleyici düğümü oluşturabileceğiz.

→ Kitapla ilgili daha fazla ayrıntıya şuradan ulaşılabilir: yayıncının web sitesi

→ Habrozhiteli için kupon kullanarak %25 indirim - Kafka Akışları

→ Kitabın basılı versiyonu için ödeme yapıldığında e-posta yoluyla elektronik kitap gönderilecektir.

Kaynak: habr.com

Yorum ekle