Cuốn sách “Kafka Streams đang hoạt động. Ứng dụng và microservice cho công việc thời gian thực"

Cuốn sách “Kafka Streams đang hoạt động. Ứng dụng và microservice cho công việc thời gian thực" Xin chào cư dân Khabro! Cuốn sách này phù hợp cho bất kỳ nhà phát triển nào muốn hiểu về xử lý luồng. Hiểu về lập trình phân tán sẽ giúp bạn hiểu rõ hơn về Kafka và Kafka Streams. Sẽ thật tuyệt nếu biết chính khung Kafka, nhưng điều này là không cần thiết: ​​Tôi sẽ cho bạn biết mọi thứ bạn cần. Các nhà phát triển Kafka có kinh nghiệm cũng như những người mới làm quen sẽ học cách tạo các ứng dụng xử lý luồng thú vị bằng thư viện Kafka Streams trong cuốn sách này. Các nhà phát triển Java trung cấp và cao cấp đã quen thuộc với các khái niệm như tuần tự hóa sẽ học cách áp dụng các kỹ năng của họ để tạo các ứng dụng Kafka Streams. Mã nguồn của cuốn sách được viết bằng Java 8 và sử dụng đáng kể cú pháp biểu thức lambda Java 8, vì vậy việc biết cách làm việc với các hàm lambda (ngay cả bằng ngôn ngữ lập trình khác) sẽ rất hữu ích.

Trích đoạn. 5.3. Hoạt động tổng hợp và cửa sổ

Trong phần này, chúng ta sẽ chuyển sang khám phá những phần hứa hẹn nhất của Kafka Streams. Cho đến nay chúng tôi đã đề cập đến các khía cạnh sau của Luồng Kafka:

  • tạo ra một cấu trúc liên kết xử lý;
  • sử dụng trạng thái trong các ứng dụng phát trực tuyến;
  • thực hiện kết nối luồng dữ liệu;
  • sự khác biệt giữa luồng sự kiện (KStream) và luồng cập nhật (KTable).

Trong các ví dụ sau, chúng tôi sẽ kết hợp tất cả các yếu tố này lại với nhau. Bạn cũng sẽ tìm hiểu về cửa sổ, một tính năng tuyệt vời khác của các ứng dụng phát trực tuyến. Ví dụ đầu tiên của chúng tôi sẽ là một sự tổng hợp đơn giản.

5.3.1. Tổng hợp doanh số bán cổ phiếu theo ngành

Tổng hợp và nhóm là những công cụ quan trọng khi làm việc với dữ liệu truyền trực tuyến. Việc kiểm tra hồ sơ cá nhân khi chúng được nhận thường không đầy đủ. Để trích xuất thông tin bổ sung từ dữ liệu, cần phải nhóm và kết hợp chúng.

Trong ví dụ này, bạn sẽ hóa trang thành một nhà giao dịch hàng ngày, người cần theo dõi khối lượng bán cổ phiếu của các công ty trong một số ngành. Cụ thể, bạn quan tâm đến 5 công ty có doanh số bán cổ phần lớn nhất trong mỗi ngành.

Việc tổng hợp như vậy sẽ yêu cầu một số bước sau để chuyển dữ liệu sang dạng mong muốn (nói theo thuật ngữ chung).

  1. Tạo nguồn dựa trên chủ đề để xuất bản thông tin giao dịch chứng khoán thô. Chúng ta sẽ phải ánh xạ một đối tượng thuộc loại StockTransaction tới một đối tượng thuộc loại ShareVolume. Vấn đề là đối tượng StockTransaction chứa siêu dữ liệu bán hàng nhưng chúng ta chỉ cần dữ liệu về số lượng cổ phiếu được bán.
  2. Nhóm dữ liệu ShareVolume theo mã chứng khoán. Sau khi được nhóm theo ký hiệu, bạn có thể thu gọn dữ liệu này thành tổng phụ của khối lượng bán hàng tồn kho. Điều đáng chú ý là phương thức KStream.groupBy trả về một thể hiện của loại KGroupedStream. Và bạn có thể lấy một phiên bản KTable bằng cách gọi thêm phương thức KGroupedStream.reduce.

Giao diện KGroupedStream là gì

Các phương thức KStream.groupBy và KStream.groupByKey trả về một phiên bản của KGroupedStream. KGroupedStream là đại diện trung gian của luồng sự kiện sau khi nhóm theo khóa. Nó hoàn toàn không nhằm mục đích làm việc trực tiếp với nó. Thay vào đó, KGroupedStream được sử dụng cho các hoạt động tổng hợp, luôn dẫn đến KTable. Và vì kết quả của các hoạt động tổng hợp là một KTable và chúng sử dụng kho lưu trữ trạng thái nên có thể không phải tất cả các bản cập nhật đều được gửi tiếp theo trong quy trình.

Phương thức KTable.groupBy trả về một KGroupedTable tương tự - một biểu diễn trung gian của luồng cập nhật, được nhóm lại theo khóa.

Chúng ta hãy nghỉ ngơi một chút và nhìn vào hình. 5.9, cho thấy những gì chúng tôi đã đạt được. Cấu trúc liên kết này chắc hẳn đã rất quen thuộc với bạn.

Cuốn sách “Kafka Streams đang hoạt động. Ứng dụng và microservice cho công việc thời gian thực"
Bây giờ chúng ta hãy xem mã cho cấu trúc liên kết này (có thể tìm thấy nó trong tệp src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Liệt kê 5.2).

Cuốn sách “Kafka Streams đang hoạt động. Ứng dụng và microservice cho công việc thời gian thực"
Mã đã cho được phân biệt bởi tính ngắn gọn và khối lượng lớn các hành động được thực hiện trong một số dòng. Bạn có thể nhận thấy điều gì đó mới trong tham số đầu tiên của phương thức builder.stream: một giá trị của loại enum AutoOffsetReset.EARLIEST (cũng có LATEST), được đặt bằng phương thức Consumed.withOffsetResetPolicy. Loại liệt kê này có thể được sử dụng để chỉ định chiến lược đặt lại bù trừ cho từng KStream hoặc KTable và được ưu tiên hơn tùy chọn đặt lại bù trừ từ cấu hình.

GroupByKey và GroupBy

Giao diện KStream có hai phương thức để nhóm các bản ghi: GroupByKey và GroupBy. Cả hai đều trả về một KGroupedTable, vì vậy bạn có thể thắc mắc sự khác biệt giữa chúng là gì và khi nào nên sử dụng cái nào?

Phương thức GroupByKey được sử dụng khi các khóa trong KStream không trống. Và quan trọng nhất, cờ “yêu cầu phân vùng lại” chưa bao giờ được đặt.

Phương thức GroupBy giả định rằng bạn đã thay đổi các khóa nhóm, do đó cờ phân vùng được đặt thành true. Thực hiện nối, tổng hợp, v.v. sau phương thức GroupBy sẽ dẫn đến việc phân vùng lại tự động.
Tóm tắt: Bất cứ khi nào có thể, bạn nên sử dụng GroupByKey thay vì GroupBy.

Rõ ràng các phương thức mapValues ​​​​và groupBy làm gì, vì vậy chúng ta hãy xem phương thức sum() (có trong src/main/java/bbejeck/model/ShareVolume.java) (Danh sách 5.3).

Cuốn sách “Kafka Streams đang hoạt động. Ứng dụng và microservice cho công việc thời gian thực"
Phương thức ShareVolume.sum trả về tổng doanh số bán hàng tồn kho và kết quả của toàn bộ chuỗi tính toán là một đối tượng KTable . Bây giờ bạn đã hiểu vai trò của KTable. Khi đối tượng ShareVolume đến, đối tượng KTable tương ứng sẽ lưu trữ bản cập nhật hiện tại mới nhất. Điều quan trọng cần nhớ là tất cả các bản cập nhật đều được phản ánh trong shareVolumeKTable trước đó, nhưng không phải tất cả đều được gửi thêm.

Tiếp theo, bằng cách sử dụng KTable này, chúng tôi tổng hợp (theo số lượng cổ phiếu được giao dịch) để đưa ra năm công ty có khối lượng cổ phiếu được giao dịch cao nhất trong mỗi ngành. Hành động của chúng tôi trong trường hợp này sẽ tương tự như hành động của tập hợp đầu tiên.

  1. Thực hiện một thao tác groupBy khác để nhóm các đối tượng ShareVolume riêng lẻ theo ngành.
  2. Bắt đầu tóm tắt các đối tượng ShareVolume. Lần này đối tượng tổng hợp là hàng đợi ưu tiên có kích thước cố định. Trong hàng đợi có kích thước cố định này, chỉ có năm công ty có số lượng cổ phiếu bán ra lớn nhất được giữ lại.
  3. Ánh xạ các hàng đợi từ đoạn trước tới một giá trị chuỗi và trả về năm cổ phiếu được giao dịch nhiều nhất theo số lượng theo ngành.
  4. Viết kết quả dưới dạng chuỗi vào chủ đề.

Trong bộ lễ phục. Hình 5.10 cho thấy biểu đồ cấu trúc liên kết luồng dữ liệu. Như bạn có thể thấy, vòng xử lý thứ hai khá đơn giản.

Cuốn sách “Kafka Streams đang hoạt động. Ứng dụng và microservice cho công việc thời gian thực"
Bây giờ chúng ta đã hiểu rõ cấu trúc của vòng xử lý thứ hai này, chúng ta có thể chuyển sang mã nguồn của nó (bạn sẽ tìm thấy nó trong tệp src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Liệt kê 5.4) .

Trình khởi tạo này chứa một biến cố địnhQueue. Đây là một đối tượng tùy chỉnh là một bộ chuyển đổi cho java.util.TreeSet được sử dụng để theo dõi N kết quả hàng đầu theo thứ tự cổ phiếu được giao dịch giảm dần.

Cuốn sách “Kafka Streams đang hoạt động. Ứng dụng và microservice cho công việc thời gian thực"
Bạn đã thấy các lệnh gọi groupBy và mapValues ​​​​nên chúng ta sẽ không đi sâu vào các lệnh gọi đó (chúng ta đang gọi phương thức KTable.toStream vì phương thức KTable.print không được dùng nữa). Nhưng bạn chưa thấy phiên bản KTable của tổng hợp(), vì vậy chúng ta sẽ dành một chút thời gian để thảo luận về vấn đề đó.

Như bạn nhớ, điều làm KTable khác biệt là các bản ghi có cùng khóa được coi là bản cập nhật. KTable thay thế mục cũ bằng mục mới. Việc tổng hợp diễn ra theo cách tương tự: các bản ghi mới nhất có cùng khóa được tổng hợp. Khi một bản ghi đến, nó sẽ được thêm vào thể hiện của lớp FixedSizePriorityQueue bằng cách sử dụng một bộ cộng (tham số thứ hai trong lệnh gọi phương thức tổng hợp), nhưng nếu một bản ghi khác đã tồn tại với cùng khóa thì bản ghi cũ sẽ bị xóa bằng cách sử dụng bộ trừ (tham số thứ ba trong cuộc gọi phương thức tổng hợp).

Tất cả điều này có nghĩa là công cụ tổng hợp của chúng tôi, FixSizePriorityQueue, không tổng hợp tất cả các giá trị bằng một khóa mà lưu trữ tổng di chuyển của số lượng N loại cổ phiếu được giao dịch nhiều nhất. Mỗi mục nhập chứa tổng số cổ phiếu đã bán cho đến nay. KTable sẽ cung cấp cho bạn thông tin về cổ phiếu của công ty nào hiện được giao dịch nhiều nhất mà không yêu cầu tổng hợp từng đợt cập nhật.

Chúng tôi đã học cách làm hai điều quan trọng:

  • nhóm các giá trị trong KTable bằng một khóa chung;
  • thực hiện các hoạt động hữu ích như tổng hợp và tổng hợp các giá trị được nhóm này.

Biết cách thực hiện các thao tác này là điều quan trọng để hiểu ý nghĩa của dữ liệu di chuyển qua ứng dụng Kafka Streams và hiểu thông tin mà nó mang theo.

Chúng tôi cũng đã tập hợp một số khái niệm chính được thảo luận trước đó trong cuốn sách này. Trong Chương 4, chúng ta đã thảo luận tầm quan trọng của trạng thái cục bộ, có khả năng chịu lỗi đối với một ứng dụng phát trực tuyến. Ví dụ đầu tiên trong chương này đã chứng minh tại sao trạng thái địa phương lại quan trọng đến vậy—nó mang lại cho bạn khả năng theo dõi những thông tin bạn đã xem. Truy cập cục bộ tránh được tình trạng trễ mạng, giúp ứng dụng hoạt động hiệu quả hơn và ít bị lỗi hơn.

Khi thực hiện bất kỳ thao tác tổng hợp hoặc tổng hợp nào, bạn phải chỉ định tên của kho lưu trữ trạng thái. Các hoạt động tổng hợp và tổng hợp trả về một phiên bản KTable và KTable sử dụng bộ lưu trữ trạng thái để thay thế các kết quả cũ bằng kết quả mới. Như bạn đã thấy, không phải tất cả các bản cập nhật đều được gửi xuống hệ thống và điều này rất quan trọng vì các hoạt động tổng hợp được thiết kế để tạo ra thông tin tóm tắt. Nếu bạn không áp dụng trạng thái cục bộ, KTable sẽ chuyển tiếp tất cả các kết quả tổng hợp và tổng hợp.

Tiếp theo, chúng ta sẽ xem xét việc thực hiện các thao tác như tổng hợp trong một khoảng thời gian cụ thể - còn gọi là các thao tác cửa sổ.

5.3.2. Thao tác cửa sổ

Trong phần trước, chúng tôi đã giới thiệu về phép tích chập và tập hợp trượt. Ứng dụng này thực hiện việc tổng hợp doanh số bán cổ phiếu liên tục, sau đó là tổng hợp năm cổ phiếu được giao dịch nhiều nhất trên sàn giao dịch.

Đôi khi việc tổng hợp và tổng hợp kết quả liên tục như vậy là cần thiết. Và đôi khi bạn chỉ cần thực hiện các thao tác trong một khoảng thời gian nhất định. Ví dụ: tính toán có bao nhiêu giao dịch trao đổi được thực hiện với cổ phiếu của một công ty cụ thể trong 10 phút qua. Hoặc có bao nhiêu người dùng đã nhấp vào biểu ngữ quảng cáo mới trong 15 phút qua. Một ứng dụng có thể thực hiện các thao tác như vậy nhiều lần nhưng kết quả chỉ áp dụng cho các khoảng thời gian nhất định (khoảng thời gian).

Đếm giao dịch trao đổi của người mua

Trong ví dụ tiếp theo, chúng tôi sẽ theo dõi các giao dịch chứng khoán của nhiều nhà giao dịch—tổ chức lớn hoặc nhà tài trợ cá nhân thông minh.

Có hai lý do có thể xảy ra cho việc theo dõi này. Một trong số đó là nhu cầu biết những người dẫn đầu thị trường đang mua/bán những gì. Nếu những tay chơi lớn và những nhà đầu tư sành sỏi này nhìn thấy cơ hội thì việc làm theo chiến lược của họ là điều hợp lý. Lý do thứ hai là mong muốn phát hiện bất kỳ dấu hiệu nào có thể có của giao dịch nội gián bất hợp pháp. Để làm được điều này, bạn sẽ cần phân tích mối tương quan giữa mức tăng đột biến về doanh số bán hàng lớn với các thông cáo báo chí quan trọng.

Việc theo dõi như vậy bao gồm các bước sau:

  • tạo luồng để đọc từ chủ đề giao dịch chứng khoán;
  • nhóm các hồ sơ đến theo ID người mua và mã chứng khoán. Việc gọi phương thức groupBy trả về một thể hiện của lớp KGroupedStream;
  • Phương thức KGroupedStream.windowedBy trả về một luồng dữ liệu được giới hạn trong một cửa sổ thời gian, cho phép tổng hợp các cửa sổ. Tùy thuộc vào loại cửa sổ, TimeWindowedKStream hoặc SessionWindowedKStream được trả về;
  • số lượng giao dịch cho hoạt động tổng hợp. Luồng dữ liệu cửa sổ xác định liệu một bản ghi cụ thể có được tính đến trong số lượng này hay không;
  • ghi kết quả vào một chủ đề hoặc xuất chúng ra bảng điều khiển trong quá trình phát triển.

Cấu trúc liên kết của ứng dụng này rất đơn giản nhưng một bức tranh rõ ràng về nó sẽ rất hữu ích. Chúng ta hãy nhìn vào hình. 5.11.

Tiếp theo, chúng ta sẽ xem xét chức năng của các hoạt động của cửa sổ và mã tương ứng.

Cuốn sách “Kafka Streams đang hoạt động. Ứng dụng và microservice cho công việc thời gian thực"

Các loại cửa sổ

Có ba loại cửa sổ trong Kafka Streams:

  • phiên họp;
  • “nhào lộn”;
  • trượt/nhảy.

Việc lựa chọn cái nào phụ thuộc vào yêu cầu kinh doanh của bạn. Cửa sổ xoay và nhảy bị giới hạn thời gian, trong khi cửa sổ phiên bị giới hạn bởi hoạt động của người dùng—thời lượng của (các) phiên chỉ được xác định bởi mức độ hoạt động của người dùng. Điều chính cần nhớ là tất cả các loại cửa sổ đều dựa trên dấu ngày/giờ của các mục nhập chứ không phải thời gian hệ thống.

Tiếp theo, chúng tôi triển khai cấu trúc liên kết của mình với từng loại cửa sổ. Mã hoàn chỉnh sẽ chỉ được cung cấp trong ví dụ đầu tiên; đối với các loại cửa sổ khác sẽ không có gì thay đổi ngoại trừ kiểu hoạt động của cửa sổ.

Cửa sổ phiên

Cửa sổ phiên rất khác với tất cả các loại cửa sổ khác. Chúng bị giới hạn không nhiều theo thời gian mà bởi hoạt động của người dùng (hoặc hoạt động của tổ chức mà bạn muốn theo dõi). Cửa sổ phiên được phân định bằng khoảng thời gian không hoạt động.

Hình 5.12 minh họa khái niệm cửa sổ phiên. Phiên nhỏ hơn sẽ hợp nhất với phiên ở bên trái. Và phiên bên phải sẽ tách biệt vì nó diễn ra sau một thời gian dài không hoạt động. Cửa sổ phiên dựa trên hoạt động của người dùng nhưng sử dụng dấu ngày/giờ từ các mục nhập để xác định mục nhập đó thuộc về phiên nào.

Cuốn sách “Kafka Streams đang hoạt động. Ứng dụng và microservice cho công việc thời gian thực"

Sử dụng cửa sổ phiên để theo dõi giao dịch chứng khoán

Hãy sử dụng cửa sổ phiên để nắm bắt thông tin về các giao dịch trao đổi. Việc triển khai các cửa sổ phiên được hiển thị trong Liệt kê 5.5 (có thể tìm thấy trong src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Cuốn sách “Kafka Streams đang hoạt động. Ứng dụng và microservice cho công việc thời gian thực"
Bạn đã thấy hầu hết các thao tác trong cấu trúc liên kết này, do đó không cần phải xem lại chúng ở đây. Nhưng cũng có một số yếu tố mới ở đây mà bây giờ chúng ta sẽ thảo luận.

Bất kỳ hoạt động nhómBy nào thường thực hiện một số loại hoạt động tổng hợp (tổng hợp, cuộn lên hoặc đếm). Bạn có thể thực hiện tổng hợp tích lũy với tổng đang chạy hoặc tổng hợp theo cửa sổ, tính đến các bản ghi trong một khoảng thời gian được chỉ định.

Mã trong Liệt kê 5.5 đếm số lượng giao dịch trong cửa sổ phiên. Trong bộ lễ phục. 5.13 những hành động này được phân tích từng bước.

Bằng cách gọi windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) chúng ta tạo một cửa sổ phiên với khoảng thời gian không hoạt động là 20 giây và khoảng thời gian duy trì là 15 phút. Khoảng thời gian không hoạt động là 20 giây có nghĩa là ứng dụng sẽ bao gồm bất kỳ mục nào đến trong vòng 20 giây kể từ khi kết thúc hoặc bắt đầu phiên hiện tại vào phiên (hoạt động) hiện tại.

Cuốn sách “Kafka Streams đang hoạt động. Ứng dụng và microservice cho công việc thời gian thực"
Tiếp theo, chúng tôi chỉ định thao tác tổng hợp nào cần được thực hiện trong cửa sổ phiên - trong trường hợp này là đếm. Nếu mục nhập đến nằm ngoài cửa sổ không hoạt động (cả hai bên của dấu ngày/giờ), ứng dụng sẽ tạo một phiên mới. Khoảng thời gian lưu giữ có nghĩa là duy trì một phiên trong một khoảng thời gian nhất định và cho phép dữ liệu trễ vượt quá khoảng thời gian không hoạt động của phiên nhưng vẫn có thể được đính kèm. Ngoài ra, thời điểm bắt đầu và kết thúc phiên mới do việc hợp nhất tương ứng với dấu ngày/giờ sớm nhất và mới nhất.

Hãy xem xét một số mục từ phương pháp đếm để biết phiên hoạt động như thế nào (Bảng 5.1).

Cuốn sách “Kafka Streams đang hoạt động. Ứng dụng và microservice cho công việc thời gian thực"
Khi các bản ghi đến, chúng tôi tìm kiếm các phiên hiện có có cùng khóa, thời gian kết thúc nhỏ hơn dấu ngày/giờ hiện tại - khoảng thời gian không hoạt động và thời gian bắt đầu lớn hơn dấu ngày/giờ hiện tại + khoảng thời gian không hoạt động. Có tính đến điều này, bốn mục từ bảng. 5.1 được hợp nhất thành một phiên duy nhất như sau.

1. Bản ghi 1 đến trước nên thời gian bắt đầu bằng thời gian kết thúc và là 00:00:00.

2. Tiếp theo, mục 2 xuất hiện và chúng tôi tìm kiếm các phiên kết thúc không sớm hơn 23:59:55 và bắt đầu không muộn hơn 00:00:35. Chúng ta tìm bản ghi 1 và kết hợp phiên 1 và 2. Chúng ta lấy thời gian bắt đầu của phiên 1 (sớm hơn) và thời gian kết thúc của phiên 2 (sau) để phiên mới của chúng ta bắt đầu lúc 00:00:00 và kết thúc lúc 00: 00:15.

3. Bản ghi 3 đến, chúng tôi tìm kiếm các phiên từ 00:00:30 đến 00:01:10 và không tìm thấy bất kỳ phiên nào. Thêm phiên thứ hai cho khóa 123-345-654,FFBE, bắt đầu và kết thúc lúc 00:00:50.

4. Bản ghi 4 đến và chúng tôi đang tìm kiếm các phiên từ 23:59:45 đến 00:00:25. Lần này cả hai phiên 1 và 2 đều được kết hợp thành một, với thời gian bắt đầu là 00:00:00 và thời gian kết thúc là 00:00:15.

Từ những gì được mô tả trong phần này, cần nhớ các sắc thái quan trọng sau:

  • phiên không phải là cửa sổ có kích thước cố định. Thời lượng của một phiên được xác định bởi hoạt động trong một khoảng thời gian nhất định;
  • Dấu ngày/giờ trong dữ liệu xác định xem sự kiện có nằm trong phiên hiện có hay trong khoảng thời gian không hoạt động hay không.

Tiếp theo chúng ta sẽ thảo luận về loại cửa sổ tiếp theo - cửa sổ "nhào lộn".

Cửa sổ "lộn xộn"

Cửa sổ lộn xộn ghi lại các sự kiện xảy ra trong một khoảng thời gian nhất định. Hãy tưởng tượng rằng bạn cần nắm bắt tất cả các giao dịch chứng khoán của một công ty nhất định cứ sau 20 giây, do đó bạn thu thập tất cả các sự kiện trong khoảng thời gian đó. Khi kết thúc khoảng thời gian 20 giây, cửa sổ sẽ cuộn lại và chuyển sang khoảng thời gian quan sát 20 giây mới. Hình 5.14 minh họa tình huống này.

Cuốn sách “Kafka Streams đang hoạt động. Ứng dụng và microservice cho công việc thời gian thực"
Như bạn có thể thấy, tất cả các sự kiện nhận được trong 20 giây qua đều được đưa vào cửa sổ. Vào cuối khoảng thời gian này, một cửa sổ mới sẽ được tạo.

Liệt kê 5.6 hiển thị mã thể hiện việc sử dụng các cửa sổ lộn xộn để nắm bắt các giao dịch chứng khoán cứ sau 20 giây (có trong src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Cuốn sách “Kafka Streams đang hoạt động. Ứng dụng và microservice cho công việc thời gian thực"
Với thay đổi nhỏ này đối với lệnh gọi phương thức TimeWindows.of, bạn có thể sử dụng cửa sổ lật. Ví dụ này không gọi phương thức Until() nên khoảng thời gian lưu giữ mặc định là 24 giờ sẽ được sử dụng.

Cuối cùng, đã đến lúc chuyển sang tùy chọn cuối cùng của cửa sổ - cửa sổ "nhảy".

Cửa sổ trượt ("nhảy")

Cửa sổ trượt/nhảy cũng tương tự như cửa sổ lật nhưng có một chút khác biệt. Cửa sổ trượt không đợi đến hết khoảng thời gian trước khi tạo cửa sổ mới để xử lý các sự kiện gần đây. Chúng bắt đầu tính toán mới sau khoảng thời gian chờ ít hơn thời lượng cửa sổ.

Để minh họa sự khác biệt giữa cửa sổ lộn nhào và cửa sổ nhảy, chúng ta hãy quay lại ví dụ đếm các giao dịch trên thị trường chứng khoán. Mục tiêu của chúng tôi vẫn là đếm số lượng giao dịch, nhưng chúng tôi không muốn đợi toàn bộ thời gian trước khi cập nhật bộ đếm. Thay vào đó, chúng tôi sẽ cập nhật bộ đếm trong khoảng thời gian ngắn hơn. Ví dụ: chúng tôi vẫn sẽ đếm số lượng giao dịch cứ sau 20 giây, nhưng cập nhật bộ đếm cứ sau 5 giây, như trong Hình. 5.15. Trong trường hợp này, chúng tôi kết thúc với ba cửa sổ kết quả có dữ liệu chồng chéo.

Cuốn sách “Kafka Streams đang hoạt động. Ứng dụng và microservice cho công việc thời gian thực"
Liệt kê 5.7 hiển thị mã để xác định các cửa sổ trượt (có trong src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Cuốn sách “Kafka Streams đang hoạt động. Ứng dụng và microservice cho công việc thời gian thực"
Cửa sổ giảm dần có thể được chuyển đổi thành cửa sổ nhảy bằng cách thêm lệnh gọi vào phương thức AdvanceBy(). Trong ví dụ hiển thị, khoảng thời gian lưu là 15 phút.

Bạn đã thấy trong phần này cách giới hạn kết quả tổng hợp trong các khoảng thời gian. Đặc biệt, tôi muốn bạn nhớ ba điều sau đây trong phần này:

  • kích thước của cửa sổ phiên bị giới hạn không phải theo khoảng thời gian mà bởi hoạt động của người dùng;
  • cửa sổ “nhào lộn” cung cấp cái nhìn tổng quan về các sự kiện trong một khoảng thời gian nhất định;
  • Thời lượng của các cửa sổ nhảy là cố định nhưng chúng được cập nhật thường xuyên và có thể chứa các mục trùng lặp trong tất cả các cửa sổ.

Tiếp theo, chúng ta sẽ tìm hiểu cách chuyển đổi KTable trở lại KStream để kết nối.

5.3.3. Kết nối các đối tượng KStream và KTable

Trong Chương 4, chúng ta đã thảo luận về việc kết nối hai đối tượng KStream. Bây giờ chúng ta phải tìm hiểu cách kết nối KTable và KStream. Điều này có thể cần thiết vì lý do đơn giản sau đây. KStream là một luồng bản ghi và KTable là một luồng cập nhật bản ghi nhưng đôi khi bạn có thể muốn thêm ngữ cảnh bổ sung vào luồng bản ghi bằng cách sử dụng các bản cập nhật từ KTable.

Hãy lấy dữ liệu về số lượng giao dịch trên sàn chứng khoán và kết hợp chúng với tin tức trên sàn giao dịch chứng khoán cho các ngành liên quan. Đây là những gì bạn cần làm để đạt được điều này với mã bạn đã có.

  1. Chuyển đổi đối tượng KTable có dữ liệu về số lượng giao dịch chứng khoán thành KStream, sau đó thay khóa bằng khóa chỉ ngành tương ứng với ký hiệu chứng khoán này.
  2. Tạo đối tượng KTable để đọc dữ liệu từ một chủ đề có tin tức về thị trường chứng khoán. KTable mới này sẽ được phân loại theo lĩnh vực công nghiệp.
  3. Kết nối cập nhật tin tức với thông tin về số lượng giao dịch chứng khoán theo ngành.

Bây giờ hãy xem cách thực hiện kế hoạch hành động này.

Chuyển đổi KTable sang KStream

Để chuyển đổi KTable sang KStream bạn cần làm như sau.

  1. Gọi phương thức KTable.toStream().
  2. Bằng cách gọi phương thức KStream.map, thay thế khóa bằng tên ngành, sau đó truy xuất đối tượng TransactionSummary từ phiên bản Windowed.

Chúng ta sẽ xâu chuỗi các thao tác này lại với nhau như sau (mã có thể được tìm thấy trong tệp src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Liệt kê 5.8).

Cuốn sách “Kafka Streams đang hoạt động. Ứng dụng và microservice cho công việc thời gian thực"
Vì chúng tôi đang thực hiện thao tác KStream.map nên phiên bản KStream được trả về sẽ tự động được phân vùng lại khi nó được sử dụng trong kết nối.

Chúng ta đã hoàn tất quá trình chuyển đổi, tiếp theo chúng ta cần tạo đối tượng KTable để đọc tin chứng khoán.

Tạo KTable cho tin tức chứng khoán

May mắn thay, việc tạo một đối tượng KTable chỉ cần một dòng mã (có thể tìm thấy mã này trong src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Liệt kê 5.9).

Cuốn sách “Kafka Streams đang hoạt động. Ứng dụng và microservice cho công việc thời gian thực"
Điều đáng chú ý là không yêu cầu chỉ định đối tượng Serde vì chuỗi Serdes được sử dụng trong cài đặt. Ngoài ra, bằng cách sử dụng kiểu liệt kê SỚM NHẤT, bảng sẽ chứa đầy các bản ghi ngay từ đầu.

Bây giờ chúng ta có thể chuyển sang bước cuối cùng - kết nối.

Kết nối cập nhật tin tức với dữ liệu số lượng giao dịch

Việc tạo kết nối không khó. Chúng tôi sẽ sử dụng phép nối bên trái trong trường hợp không có tin tức chứng khoán nào cho ngành liên quan (có thể tìm thấy mã cần thiết trong tệp src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Liệt kê 5.10).

Cuốn sách “Kafka Streams đang hoạt động. Ứng dụng và microservice cho công việc thời gian thực"
Toán tử leftJoin này khá đơn giản. Không giống như các phép nối trong Chương 4, phương thức JoinWindow không được sử dụng vì khi thực hiện phép nối KStream-KTable, chỉ có một mục nhập trong KTable cho mỗi khóa. Kết nối như vậy không bị giới hạn về thời gian: bản ghi có trong KTable hoặc không có. Kết luận chính: bằng cách sử dụng các đối tượng KTable, bạn có thể làm phong phú KStream với dữ liệu tham chiếu ít được cập nhật thường xuyên hơn.

Bây giờ chúng ta sẽ xem xét một cách hiệu quả hơn để làm phong phú thêm các sự kiện từ KStream.

5.3.4. Đối tượng GlobalKTable

Như bạn có thể thấy, cần phải làm phong phú các luồng sự kiện hoặc thêm ngữ cảnh cho chúng. Trong Chương 4, bạn đã thấy kết nối giữa hai đối tượng KStream và trong phần trước bạn đã thấy kết nối giữa KStream và KTable. Trong tất cả các trường hợp này, cần phải phân vùng lại luồng dữ liệu khi ánh xạ các khóa sang một loại hoặc giá trị mới. Đôi khi việc phân vùng lại được thực hiện một cách rõ ràng và đôi khi Kafka Streams thực hiện việc đó một cách tự động. Việc phân vùng lại là cần thiết vì các khóa đã thay đổi và các bản ghi phải kết thúc ở các phần mới, nếu không thì sẽ không thể kết nối được (điều này đã được thảo luận trong Chương 4, trong phần “Phân vùng lại dữ liệu” ở tiểu mục 4.2.4).

Việc phân vùng lại có chi phí

Việc phân vùng lại đòi hỏi chi phí - chi phí tài nguyên bổ sung để tạo chủ đề trung gian, lưu trữ dữ liệu trùng lặp trong chủ đề khác; điều đó cũng có nghĩa là độ trễ tăng lên do ghi và đọc từ chủ đề này. Ngoài ra, nếu bạn cần kết hợp trên nhiều khía cạnh hoặc chiều, bạn phải xâu chuỗi các kết nối, ánh xạ các bản ghi bằng khóa mới và chạy lại quy trình phân vùng lại.

Kết nối với tập dữ liệu nhỏ hơn

Trong một số trường hợp, khối lượng dữ liệu tham chiếu được kết nối tương đối nhỏ, do đó, các bản sao hoàn chỉnh của dữ liệu đó có thể dễ dàng khớp cục bộ trên mỗi nút. Đối với những tình huống như thế này, Kafka Streams cung cấp lớp GlobalKTable.

Các phiên bản GlobalKTable là duy nhất vì ứng dụng sao chép tất cả dữ liệu vào từng nút. Và vì tất cả dữ liệu đều có trên mỗi nút nên không cần phải phân vùng luồng sự kiện bằng khóa dữ liệu tham chiếu để nó có sẵn cho tất cả các phân vùng. Bạn cũng có thể tạo các kết nối không cần chìa khóa bằng cách sử dụng các đối tượng GlobalKTable. Hãy quay lại một trong những ví dụ trước để chứng minh tính năng này.

Kết nối các đối tượng KStream với các đối tượng GlobalKTable

Trong tiểu mục 5.3.2, chúng tôi đã thực hiện tổng hợp cửa sổ các giao dịch trao đổi của người mua. Kết quả của sự tổng hợp này trông giống như thế này:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Mặc dù những kết quả này phục vụ mục đích nhưng sẽ hữu ích hơn nếu tên khách hàng và tên đầy đủ của công ty cũng được hiển thị. Để thêm tên khách hàng và tên công ty, bạn có thể thực hiện các phép nối thông thường, nhưng bạn sẽ cần thực hiện hai ánh xạ chính và phân vùng lại. Với GlobalKTable bạn có thể tránh được chi phí cho những hoạt động đó.

Để thực hiện điều này, chúng ta sẽ sử dụng đối tượng countStream từ Liệt kê 5.11 (có thể tìm thấy mã tương ứng trong src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) và kết nối nó với hai đối tượng GlobalKTable.

Cuốn sách “Kafka Streams đang hoạt động. Ứng dụng và microservice cho công việc thời gian thực"
Chúng ta đã thảo luận vấn đề này trước đây rồi nên tôi sẽ không nhắc lại. Nhưng tôi lưu ý rằng mã trong hàm toStream().map được trừu tượng hóa thành một đối tượng hàm thay vì biểu thức lambda nội tuyến để dễ đọc.

Bước tiếp theo là khai báo hai phiên bản của GlobalKTable (mã hiển thị có thể được tìm thấy trong tệp src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Liệt kê 5.12).

Cuốn sách “Kafka Streams đang hoạt động. Ứng dụng và microservice cho công việc thời gian thực"

Xin lưu ý rằng tên chủ đề được mô tả bằng cách sử dụng các kiểu liệt kê.

Bây giờ chúng ta đã có tất cả các thành phần sẵn sàng, tất cả những gì còn lại là viết mã cho kết nối (có thể tìm thấy mã này trong tệp src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Liệt kê 5.13).

Cuốn sách “Kafka Streams đang hoạt động. Ứng dụng và microservice cho công việc thời gian thực"
Mặc dù có hai phép nối trong mã này nhưng chúng bị xâu chuỗi vì cả hai kết quả của chúng đều không được sử dụng riêng biệt. Kết quả được hiển thị ở phần cuối của toàn bộ hoạt động.

Khi bạn chạy thao tác nối ở trên, bạn sẽ nhận được kết quả như thế này:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Bản chất không thay đổi, nhưng những kết quả này trông rõ ràng hơn.

Nếu đếm ngược đến Chương 4, bạn đã thấy một số loại kết nối đang hoạt động. Chúng được liệt kê trong bảng. 5.2. Bảng này phản ánh khả năng kết nối kể từ phiên bản 1.0.0 của Kafka Streams; Một cái gì đó có thể thay đổi trong phiên bản tương lai.

Cuốn sách “Kafka Streams đang hoạt động. Ứng dụng và microservice cho công việc thời gian thực"
Để tóm tắt mọi thứ, hãy tóm tắt lại những điều cơ bản: bạn có thể kết nối luồng sự kiện (KStream) và luồng cập nhật (KTable) bằng trạng thái cục bộ. Ngoài ra, nếu kích thước của dữ liệu tham chiếu không quá lớn, bạn có thể sử dụng đối tượng GlobalKTable. GlobalKTables sao chép tất cả các phân vùng vào từng nút ứng dụng Kafka Streams, đảm bảo rằng tất cả dữ liệu đều có sẵn bất kể khóa tương ứng với phân vùng nào.

Tiếp theo, chúng ta sẽ thấy tính năng Luồng Kafka, nhờ đó chúng ta có thể quan sát các thay đổi trạng thái mà không tiêu tốn dữ liệu từ chủ đề Kafka.

5.3.5. Trạng thái có thể truy vấn

Chúng tôi đã thực hiện một số thao tác liên quan đến trạng thái và luôn xuất kết quả ra bảng điều khiển (cho mục đích phát triển) hoặc ghi chúng vào một chủ đề (cho mục đích sản xuất). Khi viết kết quả cho một chủ đề, bạn phải sử dụng người tiêu dùng Kafka để xem chúng.

Đọc dữ liệu từ các chủ đề này có thể được coi là một loại quan điểm cụ thể hóa. Vì mục đích của mình, chúng tôi có thể sử dụng định nghĩa về chế độ xem cụ thể hóa từ Wikipedia: “...một đối tượng cơ sở dữ liệu vật lý chứa kết quả của một truy vấn. Ví dụ: nó có thể là một bản sao cục bộ của dữ liệu từ xa hoặc một tập hợp con của các hàng và/hoặc cột của một bảng hoặc kết quả nối hoặc một bảng tóm tắt thu được thông qua tổng hợp” (https://en.wikipedia.org/wiki /Materialized_view).

Kafka Streams cũng cho phép bạn chạy các truy vấn tương tác trên các cửa hàng trạng thái, cho phép bạn đọc trực tiếp các chế độ xem cụ thể hóa này. Điều quan trọng cần lưu ý là truy vấn tới kho lưu trữ trạng thái là thao tác chỉ đọc. Điều này đảm bảo rằng bạn không phải lo lắng về việc vô tình làm cho trạng thái không nhất quán trong khi ứng dụng của bạn đang xử lý dữ liệu.

Khả năng truy vấn trực tiếp các cửa hàng trạng thái là quan trọng. Điều này có nghĩa là bạn có thể tạo các ứng dụng bảng điều khiển mà không cần phải tìm nạp dữ liệu từ người tiêu dùng Kafka trước. Nó cũng làm tăng hiệu quả của ứng dụng, do không cần phải ghi lại dữ liệu:

  • nhờ vào vị trí của dữ liệu, chúng có thể được truy cập nhanh chóng;
  • loại bỏ sự trùng lặp dữ liệu vì nó không được ghi vào bộ nhớ ngoài.

Điều chính tôi muốn bạn nhớ là bạn có thể truy vấn trực tiếp trạng thái từ bên trong ứng dụng của mình. Những cơ hội mà điều này mang lại cho bạn không thể bị phóng đại. Thay vì sử dụng dữ liệu từ Kafka và lưu trữ các bản ghi trong cơ sở dữ liệu cho ứng dụng, bạn có thể truy vấn các kho lưu trữ trạng thái với kết quả tương tự. Truy vấn trực tiếp tới các cửa hàng tiểu bang có nghĩa là ít mã hơn (không có người tiêu dùng) và ít phần mềm hơn (không cần bảng cơ sở dữ liệu để lưu trữ kết quả).

Chúng ta đã đề cập khá nhiều vấn đề trong chương này, vì vậy bây giờ chúng ta sẽ tạm dừng thảo luận về các truy vấn tương tác đối với các cửa hàng nhà nước. Nhưng đừng lo lắng: trong Chương 9, chúng ta sẽ tạo một ứng dụng bảng điều khiển đơn giản với các truy vấn tương tác. Nó sẽ sử dụng một số ví dụ từ chương này và các chương trước để minh họa các truy vấn tương tác và cách bạn có thể thêm chúng vào ứng dụng Kafka Streams.

Tóm tắt thông tin

  • Các đối tượng KStream đại diện cho các luồng sự kiện, có thể so sánh với việc chèn vào cơ sở dữ liệu. Các đối tượng KTable đại diện cho các luồng cập nhật, giống như các bản cập nhật cho cơ sở dữ liệu hơn. Kích thước của đối tượng KTable không tăng lên, các bản ghi cũ được thay thế bằng bản ghi mới.
  • Đối tượng KTable là cần thiết cho các hoạt động tổng hợp.
  • Bằng cách sử dụng các thao tác cửa sổ, bạn có thể chia dữ liệu tổng hợp thành các nhóm thời gian.
  • Nhờ các đối tượng GlobalKTable, bạn có thể truy cập dữ liệu tham chiếu ở bất kỳ đâu trong ứng dụng, bất kể phân vùng.
  • Có thể kết nối giữa các đối tượng KStream, KTable và GlobalKTable.

Cho đến nay, chúng tôi đã tập trung vào việc xây dựng các ứng dụng Kafka Streams bằng KStream DSL cấp cao. Mặc dù cách tiếp cận cấp cao cho phép bạn tạo các chương trình gọn gàng và súc tích nhưng việc sử dụng nó sẽ phải đánh đổi. Làm việc với DSL KStream có nghĩa là tăng tính đồng nhất cho mã của bạn bằng cách giảm mức độ kiểm soát. Trong chương tiếp theo, chúng ta sẽ xem xét API nút xử lý cấp thấp và thử các sự cân bằng khác. Các chương trình sẽ dài hơn trước đây, nhưng chúng ta sẽ có thể tạo hầu hết mọi nút xử lý mà chúng ta có thể cần.

→ Thông tin chi tiết về cuốn sách có thể tìm thấy tại trang web của nhà xuất bản

→ Đối với Habrozhiteli, giảm giá 25% khi sử dụng phiếu giảm giá - Suối Kafka

→ Sau khi thanh toán cho phiên bản giấy của cuốn sách, một cuốn sách điện tử sẽ được gửi qua e-mail.

Nguồn: www.habr.com

Thêm một lời nhận xét