Redis Stream - độ tin cậy và khả năng mở rộng của hệ thống nhắn tin của bạn

Redis Stream - độ tin cậy và khả năng mở rộng của hệ thống nhắn tin của bạn

Redis Stream là kiểu dữ liệu trừu tượng mới được giới thiệu trong Redis với phiên bản 5.0
Về mặt khái niệm, Redis Stream là một Danh sách mà bạn có thể thêm các mục vào. Mỗi mục có một mã định danh duy nhất. Theo mặc định, ID được tạo tự động và bao gồm dấu thời gian. Do đó, bạn có thể truy vấn phạm vi bản ghi theo thời gian hoặc nhận dữ liệu mới khi nó đến trong luồng, giống như lệnh "tail -f" Unix đọc tệp nhật ký và đóng băng trong khi chờ dữ liệu mới. Lưu ý rằng nhiều máy khách có thể nghe một luồng cùng lúc, giống như nhiều tiến trình "tail -f" có thể đọc một tệp đồng thời mà không xung đột với nhau.

Để hiểu tất cả lợi ích của loại dữ liệu mới, chúng ta hãy xem nhanh các cấu trúc Redis tồn tại từ lâu nhằm sao chép một phần chức năng của Redis Stream.

Làm lại PUB/SUB

Redis Pub/Sub là một hệ thống nhắn tin đơn giản đã được tích hợp sẵn trong kho lưu trữ khóa-giá trị của bạn. Tuy nhiên, sự đơn giản có cái giá phải trả:

  • Nếu nhà xuất bản vì lý do nào đó không thành công thì sẽ mất tất cả người đăng ký
  • Nhà xuất bản cần biết địa chỉ chính xác của tất cả người đăng ký
  • Nhà xuất bản có thể khiến người đăng ký của mình quá tải nếu dữ liệu được xuất bản nhanh hơn tốc độ xử lý
  • Tin nhắn sẽ bị xóa khỏi bộ đệm của nhà xuất bản ngay sau khi xuất bản, bất kể nó được gửi đến bao nhiêu người đăng ký và họ có thể xử lý tin nhắn này nhanh như thế nào.
  • Tất cả các thuê bao sẽ nhận được tin nhắn cùng một lúc. Bản thân người đăng ký bằng cách nào đó phải đồng ý với nhau về thứ tự xử lý cùng một tin nhắn.
  • Không có cơ chế tích hợp nào để xác nhận rằng thuê bao đã xử lý tin nhắn thành công. Nếu người đăng ký nhận được tin nhắn và gặp sự cố trong quá trình xử lý, nhà xuất bản sẽ không biết về tin nhắn đó.

Danh sách làm lại

Redis List là cấu trúc dữ liệu hỗ trợ chặn các lệnh đọc. Bạn có thể thêm và đọc tin nhắn từ đầu hoặc cuối danh sách. Dựa trên cấu trúc này, bạn có thể tạo một ngăn xếp hoặc hàng đợi tốt cho hệ thống phân tán của mình và trong hầu hết các trường hợp, điều này là đủ. Sự khác biệt chính so với Redis Pub/Sub:

  • Tin nhắn được gửi đến một khách hàng. Máy khách bị chặn đọc đầu tiên sẽ nhận được dữ liệu trước.
  • Clint phải tự mình bắt đầu thao tác đọc từng tin nhắn. Danh sách không biết gì về khách hàng.
  • Tin nhắn được lưu trữ cho đến khi ai đó đọc chúng hoặc xóa chúng một cách rõ ràng. Nếu bạn định cấu hình máy chủ Redis để xóa dữ liệu vào đĩa thì độ tin cậy của hệ thống sẽ tăng lên đáng kể.

Giới thiệu về luồng

Thêm mục nhập vào luồng

Đội XADD thêm một mục mới vào luồng. Bản ghi không chỉ là một chuỗi mà nó bao gồm một hoặc nhiều cặp khóa-giá trị. Do đó, mỗi mục nhập đã được cấu trúc sẵn và giống với cấu trúc của tệp CSV.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

Trong ví dụ trên, chúng tôi thêm hai trường vào luồng có tên (khóa) “mystream”: “sensor-id” và “nhiệt độ” với các giá trị lần lượt là “1234” và “19.8”. Là đối số thứ hai, lệnh lấy một mã định danh sẽ được gán cho mục nhập - mã định danh này xác định duy nhất từng mục nhập trong luồng. Tuy nhiên, trong trường hợp này, chúng tôi đã vượt qua * vì chúng tôi muốn Redis tạo ID mới cho chúng tôi. Mỗi ID mới sẽ tăng lên. Do đó, mỗi mục mới sẽ có mã định danh cao hơn so với các mục trước đó.

Định dạng định danh

ID mục được trả về bởi lệnh XADD, gồm hai phần:

{millisecondsTime}-{sequenceNumber}

mili giâyThời gian — Thời gian Unix tính bằng mili giây (thời gian máy chủ Redis). Tuy nhiên, nếu thời gian hiện tại bằng hoặc nhỏ hơn thời gian của bản ghi trước đó thì dấu thời gian của bản ghi trước đó sẽ được sử dụng. Do đó, nếu thời gian của máy chủ quay ngược thời gian, mã định danh mới sẽ vẫn giữ thuộc tính tăng dần.

serialNumber được sử dụng cho các bản ghi được tạo trong cùng một phần nghìn giây. serialNumber sẽ tăng thêm 1 so với mục trước đó. Bởi vì serialNumber có kích thước 64 bit thì trong thực tế, bạn không nên gặp phải giới hạn về số lượng bản ghi có thể được tạo trong vòng một mili giây.

Định dạng của các mã nhận dạng như vậy thoạt nhìn có vẻ lạ. Một độc giả không tin tưởng có thể thắc mắc tại sao thời gian lại là một phần của định danh. Lý do là các luồng Redis hỗ trợ truy vấn phạm vi theo ID. Vì mã định danh được liên kết với thời gian bản ghi được tạo nên điều này giúp bạn có thể truy vấn phạm vi thời gian. Chúng ta sẽ xem xét một ví dụ cụ thể khi chúng ta xem lệnh XƯƠNG.

Nếu vì lý do nào đó mà người dùng cần chỉ định mã định danh của riêng mình, chẳng hạn như mã định danh được liên kết với một số hệ thống bên ngoài, thì chúng ta có thể chuyển nó vào lệnh XADD thay vì * như hiển thị bên dưới:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Xin lưu ý rằng trong trường hợp này bạn phải tự mình theo dõi việc tăng ID. Trong ví dụ của chúng tôi, mã định danh tối thiểu là "0-1", do đó lệnh sẽ không chấp nhận mã định danh khác bằng hoặc nhỏ hơn "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Số lượng bản ghi trên mỗi luồng

Có thể lấy số lượng bản ghi trong một luồng chỉ bằng cách sử dụng lệnh XLEN. Ví dụ của chúng tôi, lệnh này sẽ trả về giá trị sau:

> XLEN somestream
(integer) 2

Truy vấn phạm vi - XRANGE và XREVRANGE

Để yêu cầu dữ liệu theo phạm vi, chúng ta cần chỉ định hai mã định danh - phần đầu và phần cuối của phạm vi. Phạm vi trả về sẽ bao gồm tất cả các phần tử, kể cả các ranh giới. Ngoài ra còn có hai mã định danh đặc biệt “-” và “+”, lần lượt có nghĩa là mã định danh nhỏ nhất (bản ghi đầu tiên) và lớn nhất (bản ghi cuối cùng) trong luồng. Ví dụ dưới đây sẽ liệt kê tất cả các mục luồng.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Mỗi bản ghi được trả về là một mảng gồm hai phần tử: một mã định danh và một danh sách các cặp khóa-giá trị. Chúng tôi đã nói rằng số nhận dạng bản ghi có liên quan đến thời gian. Vì vậy, chúng tôi có thể yêu cầu một khoảng thời gian cụ thể. Tuy nhiên, chúng ta có thể chỉ định trong yêu cầu không phải mã định danh đầy đủ mà chỉ có thời gian Unix, bỏ qua phần liên quan đến serialNumber. Phần bị bỏ qua của mã định danh sẽ tự động được đặt thành XNUMX ở đầu phạm vi và thành giá trị tối đa có thể có ở cuối phạm vi. Dưới đây là ví dụ về cách bạn có thể yêu cầu phạm vi hai mili giây.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Chúng tôi chỉ có một mục trong phạm vi này, tuy nhiên trong các tập dữ liệu thực, kết quả trả về có thể rất lớn. Vì lý do này XƯƠNG hỗ trợ tùy chọn COUNT. Bằng cách chỉ định số lượng, chúng ta có thể dễ dàng nhận được N bản ghi đầu tiên. Nếu chúng ta cần lấy N bản ghi tiếp theo (phân trang), chúng ta có thể sử dụng ID nhận được cuối cùng, tăng nó lên serialNumber một cái và hỏi lại. Hãy xem xét điều này trong ví dụ sau. Chúng tôi bắt đầu thêm 10 phần tử với XADD (giả sử mystream đã chứa đầy 10 phần tử). Để bắt đầu lặp lại nhận 2 phần tử cho mỗi lệnh, chúng ta bắt đầu với phạm vi đầy đủ nhưng với COUNT bằng 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Để tiếp tục lặp lại với hai phần tử tiếp theo, chúng ta cần chọn ID cuối cùng nhận được, tức là 1519073279157-0 và thêm 1 vào serialNumber.
ID kết quả, trong trường hợp này là 1519073279157-1, hiện có thể được sử dụng làm đối số bắt đầu mới của phạm vi cho cuộc gọi tiếp theo XƯƠNG:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Và như thế. Bởi vì sự phức tạp XƯƠNG là O(log(N)) để tìm kiếm và sau đó là O(M) để trả về M phần tử, khi đó mỗi bước lặp sẽ nhanh. Như vậy, sử dụng XƯƠNG luồng có thể được lặp lại một cách hiệu quả.

Đội XEVRANGE là tương đương XƯƠNG, nhưng trả về các phần tử theo thứ tự ngược lại:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Xin lưu ý rằng lệnh XEVRANGE lấy các đối số phạm vi bắt đầu và dừng theo thứ tự ngược lại.

Đọc các mục mới bằng XREAD

Thông thường, nhiệm vụ phát sinh là đăng ký một luồng và chỉ nhận tin nhắn mới. Khái niệm này có vẻ giống với Redis Pub/Sub hoặc chặn Redis List, nhưng có những khác biệt cơ bản trong cách sử dụng Redis Stream:

  1. Theo mặc định, mỗi tin nhắn mới sẽ được gửi đến mọi người đăng ký. Hành vi này khác với Danh sách Redis chặn, trong đó tin nhắn mới sẽ chỉ được đọc bởi một người đăng ký.
  2. Trong khi ở Redis Pub/Sub, tất cả các tin nhắn đều bị lãng quên và không bao giờ tồn tại thì trong Stream, tất cả các tin nhắn đều được giữ lại vô thời hạn (trừ khi ứng dụng khách rõ ràng yêu cầu xóa).
  3. Redis Stream cho phép bạn phân biệt quyền truy cập vào tin nhắn trong một luồng. Một người đăng ký cụ thể chỉ có thể xem lịch sử tin nhắn cá nhân của họ.

Bạn có thể đăng ký một chủ đề và nhận tin nhắn mới bằng lệnh XREAD. Nó phức tạp hơn một chút so với XƯƠNG, vì vậy trước tiên chúng ta sẽ bắt đầu với những ví dụ đơn giản hơn.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Ví dụ trên cho thấy một hình thức không chặn XREAD. Lưu ý rằng tùy chọn COUNT là tùy chọn. Trên thực tế, tùy chọn lệnh bắt buộc duy nhất là tùy chọn STREAMS, tùy chọn này chỉ định danh sách các luồng cùng với mã định danh tối đa tương ứng. Chúng tôi đã viết “STREAMS mystream 0” - chúng tôi muốn nhận tất cả các bản ghi của luồng mystream có mã định danh lớn hơn “0-0”. Như bạn có thể thấy trong ví dụ, lệnh trả về tên của luồng vì chúng ta có thể đăng ký nhiều luồng cùng một lúc. Ví dụ: chúng ta có thể viết "STREAMS mystream otherstream 0 0". Xin lưu ý rằng sau tùy chọn STREAMS, trước tiên chúng tôi cần cung cấp tên của tất cả các luồng được yêu cầu và chỉ sau đó là danh sách số nhận dạng.

Ở dạng đơn giản này, lệnh không làm gì đặc biệt so với XƯƠNG. Tuy nhiên, điều thú vị là chúng ta có thể dễ dàng biến XREAD vào lệnh chặn, chỉ định đối số BLOCK:

> XREAD BLOCK 0 STREAMS mystream $

Trong ví dụ trên, tùy chọn BLOCK mới được chỉ định với thời gian chờ là 0 mili giây (điều này có nghĩa là phải chờ vô thời hạn). Hơn nữa, thay vì chuyển mã định danh thông thường cho luồng mystream, một mã định danh đặc biệt $ đã được chuyển. Mã định danh đặc biệt này có nghĩa là XREAD phải sử dụng mã định danh tối đa trong mystream làm mã định danh. Vì vậy chúng ta sẽ chỉ nhận được tin nhắn mới kể từ thời điểm bắt đầu nghe. Ở một khía cạnh nào đó, lệnh này tương tự như lệnh "tail -f" của Unix.

Lưu ý rằng khi sử dụng tùy chọn BLOCK chúng ta không nhất thiết phải sử dụng mã định danh đặc biệt $. Chúng tôi có thể sử dụng bất kỳ mã định danh nào hiện có trong luồng. Nếu nhóm có thể phục vụ yêu cầu của chúng tôi ngay lập tức mà không bị chặn thì họ sẽ làm như vậy, nếu không thì sẽ chặn.

Chặn XREAD cũng có thể nghe nhiều luồng cùng một lúc, bạn chỉ cần chỉ định tên của chúng. Trong trường hợp này, lệnh sẽ trả về bản ghi của luồng đầu tiên nhận dữ liệu. Người đăng ký đầu tiên bị chặn đối với một chuỗi nhất định sẽ nhận được dữ liệu trước.

Nhóm người tiêu dùng

Trong một số tác vụ nhất định, chúng tôi muốn giới hạn quyền truy cập của người đăng ký vào các tin nhắn trong một chuỗi. Một ví dụ mà điều này có thể hữu ích là hàng đợi tin nhắn với các công nhân sẽ nhận được các tin nhắn khác nhau từ một chuỗi, cho phép xử lý tin nhắn theo quy mô.

Nếu chúng ta tưởng tượng rằng chúng ta có ba người đăng ký C1, C2, C3 và một chuỗi chứa các tin nhắn 1, 2, 3, 4, 5, 6, 7 thì các tin nhắn sẽ được phân phát như trong sơ đồ bên dưới:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Để đạt được hiệu ứng này, Redis Stream sử dụng một khái niệm gọi là Nhóm người tiêu dùng. Khái niệm này tương tự như một thuê bao giả, nhận dữ liệu từ một luồng nhưng thực tế được phục vụ bởi nhiều người đăng ký trong một nhóm, cung cấp một số đảm bảo nhất định:

  1. Mỗi tin nhắn được gửi đến một người đăng ký khác nhau trong nhóm.
  2. Trong một nhóm, người đăng ký được xác định bằng tên của họ, đây là một chuỗi phân biệt chữ hoa chữ thường. Nếu một người đăng ký tạm thời rời khỏi nhóm, anh ta có thể được khôi phục vào nhóm bằng tên riêng của mình.
  3. Mọi nhóm người tiêu dùng đều tuân theo khái niệm “tin nhắn chưa đọc đầu tiên”. Khi một thuê bao yêu cầu tin nhắn mới, nó chỉ có thể nhận những tin nhắn chưa từng được gửi trước đó cho bất kỳ thuê bao nào trong nhóm.
  4. Có một lệnh để xác nhận rõ ràng rằng tin nhắn đã được người đăng ký xử lý thành công. Cho đến khi lệnh này được gọi, tin nhắn được yêu cầu sẽ vẫn ở trạng thái "đang chờ xử lý".
  5. Trong Nhóm Người tiêu dùng, mỗi người đăng ký có thể yêu cầu lịch sử các tin nhắn đã được gửi cho mình nhưng chưa được xử lý (ở trạng thái “đang chờ xử lý”)

Theo một nghĩa nào đó, trạng thái của nhóm có thể được biểu diễn như sau:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Bây giờ là lúc làm quen với các lệnh chính của Nhóm Người tiêu dùng, cụ thể là:

  • XGROUP được sử dụng để tạo, hủy và quản lý nhóm
  • XREADGROUP được sử dụng để đọc luồng qua nhóm
  • XACK - lệnh này cho phép người đăng ký đánh dấu tin nhắn đã được xử lý thành công

Thành lập nhóm người tiêu dùng

Giả sử rằng mystream đã tồn tại. Khi đó lệnh tạo nhóm sẽ như sau:

> XGROUP CREATE mystream mygroup $
OK

Khi tạo nhóm, chúng ta phải chuyển một mã định danh, bắt đầu từ đó nhóm sẽ nhận được tin nhắn. Nếu chúng tôi chỉ muốn nhận tất cả tin nhắn mới thì chúng tôi có thể sử dụng mã định danh đặc biệt $ (như trong ví dụ của chúng tôi ở trên). Nếu bạn chỉ định 0 thay vì một mã định danh đặc biệt thì tất cả thư trong chuỗi sẽ có sẵn cho nhóm.

Bây giờ nhóm đã được tạo, chúng ta có thể bắt đầu đọc tin nhắn ngay lập tức bằng lệnh XREADGROUP. Lệnh này rất giống với XREAD và hỗ trợ tùy chọn BLOCK tùy chọn. Tuy nhiên, có một tùy chọn NHÓM bắt buộc phải luôn được chỉ định bằng hai đối số: tên nhóm và tên người đăng ký. Tùy chọn COUNT cũng được hỗ trợ.

Trước khi đọc chủ đề, chúng ta hãy đặt một số thông điệp ở đó:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Bây giờ chúng ta hãy thử đọc luồng này qua nhóm:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Lệnh trên đọc nguyên văn như sau:

“Tôi, người đăng ký Alice, một thành viên của nhóm mygroup, muốn đọc một tin nhắn từ luồng của tôi chưa từng được gửi cho bất kỳ ai trước đây.”

Mỗi khi người đăng ký thực hiện một thao tác trên một nhóm, nó phải cung cấp tên của mình, nhận dạng duy nhất chính nó trong nhóm. Có một chi tiết rất quan trọng nữa trong lệnh trên - mã định danh đặc biệt ">". Mã định danh đặc biệt này lọc các tin nhắn, chỉ để lại những tin nhắn chưa từng được gửi trước đó.

Ngoài ra, trong những trường hợp đặc biệt, bạn có thể chỉ định một mã định danh thực như 0 hoặc bất kỳ mã định danh hợp lệ nào khác. Trong trường hợp này lệnh XREADGROUP sẽ trả lại cho bạn lịch sử các tin nhắn có trạng thái "đang chờ xử lý" đã được gửi đến người đăng ký được chỉ định (Alice) nhưng chưa được xác nhận bằng lệnh XACK.

Chúng ta có thể kiểm tra hành vi này bằng cách chỉ định ngay ID 0 mà không cần tùy chọn ĐẾM. Chúng ta sẽ chỉ thấy một tin nhắn đang chờ xử lý, đó là tin nhắn của quả táo:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Tuy nhiên, nếu chúng tôi xác nhận thông báo đã được xử lý thành công thì nó sẽ không còn hiển thị nữa:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Bây giờ đến lượt Bob đọc một cái gì đó:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, một thành viên trong nhóm của tôi, yêu cầu không quá hai tin nhắn. Lệnh chỉ báo cáo các tin nhắn chưa được gửi do mã định danh đặc biệt ">". Như bạn có thể thấy, tin nhắn "quả táo" sẽ không được hiển thị vì nó đã được gửi cho Alice nên Bob nhận được "cam" và "dâu tây".

Bằng cách này, Alice, Bob và bất kỳ người đăng ký nào khác trong nhóm có thể đọc các tin nhắn khác nhau từ cùng một luồng. Họ cũng có thể đọc lịch sử các tin nhắn chưa được xử lý hoặc đánh dấu các tin nhắn là đã xử lý.

Có một số điều cần ghi nhớ:

  • Ngay khi người đăng ký coi tin nhắn là một mệnh lệnh XREADGROUP, tin nhắn này sẽ chuyển sang trạng thái “đang chờ xử lý” và được gán cho thuê bao cụ thể đó. Những người đăng ký nhóm khác sẽ không thể đọc được tin nhắn này.
  • Người đăng ký được tạo tự động khi được đề cập lần đầu, không cần phải tạo chúng một cách rõ ràng.
  • Với XREADGROUP bạn có thể đọc tin nhắn từ nhiều chủ đề khác nhau cùng một lúc, tuy nhiên để làm được việc này, trước tiên bạn cần tạo các nhóm có cùng tên cho mỗi chủ đề bằng cách sử dụng XGROUP

Phục hồi sau thất bại

Người đăng ký có thể khôi phục sau lỗi và đọc lại danh sách tin nhắn của mình với trạng thái “đang chờ xử lý”. Tuy nhiên, trong thế giới thực, người đăng ký cuối cùng có thể thất bại. Điều gì xảy ra với tin nhắn bị kẹt của người đăng ký nếu người đăng ký không thể khôi phục sau khi bị lỗi?
Nhóm Người tiêu dùng cung cấp một tính năng chỉ được sử dụng cho những trường hợp như vậy - khi bạn cần thay đổi chủ sở hữu của tin nhắn.

Điều đầu tiên bạn cần làm là gọi lệnh ĐANG XUẤT HIỆN, hiển thị tất cả các tin nhắn trong nhóm có trạng thái “đang chờ xử lý”. Ở dạng đơn giản nhất, lệnh được gọi chỉ với hai đối số: tên luồng và tên nhóm:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Nhóm đã hiển thị số lượng tin nhắn chưa được xử lý của toàn bộ nhóm và của từng người đăng ký. Chúng tôi chỉ có Bob với hai tin nhắn chưa xử lý vì tin nhắn duy nhất mà Alice yêu cầu đã được xác nhận bằng XACK.

Chúng ta có thể yêu cầu thêm thông tin bằng cách sử dụng nhiều đối số hơn:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - phạm vi số nhận dạng (bạn có thể sử dụng “-” và “+”)
{count} - số lần thử giao hàng
{tên người tiêu dùng} - tên nhóm

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Bây giờ chúng tôi có thông tin chi tiết cho từng tin nhắn: ID, tên người đăng ký, thời gian rảnh tính bằng mili giây và cuối cùng là số lần gửi thử. Chúng tôi có hai tin nhắn từ Bob và chúng không hoạt động trong 74170458 mili giây, khoảng 20 giờ.

Xin lưu ý rằng không ai ngăn chúng tôi kiểm tra nội dung của tin nhắn chỉ bằng cách sử dụng XƯƠNG.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Chúng ta chỉ cần lặp lại cùng một mã định danh hai lần trong các đối số. Bây giờ chúng ta đã có một số ý tưởng, Alice có thể quyết định rằng sau 20 giờ ngừng hoạt động, Bob có thể sẽ không phục hồi và đã đến lúc truy vấn những tin nhắn đó và tiếp tục xử lý chúng cho Bob. Đối với điều này, chúng tôi sử dụng lệnh XÁC NHẬN:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Sử dụng lệnh này, chúng ta có thể nhận được thông báo “nước ngoài” chưa được xử lý bằng cách thay đổi chủ sở hữu thành {consumer}. Tuy nhiên, chúng tôi cũng có thể cung cấp thời gian nhàn rỗi tối thiểu {min-idle-time}. Điều này giúp tránh tình huống hai khách hàng cố gắng thay đổi đồng thời chủ sở hữu của cùng một thư:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Khách hàng đầu tiên sẽ đặt lại thời gian ngừng hoạt động và tăng quầy giao hàng. Vì vậy, khách hàng thứ hai sẽ không thể yêu cầu nó.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Tin nhắn đã được xác nhận thành công bởi Alice, người hiện có thể xử lý tin nhắn và xác nhận nó.

Từ ví dụ trên, bạn có thể thấy rằng một yêu cầu thành công sẽ trả về nội dung của chính tin nhắn đó. Tuy nhiên, điều này là không cần thiết. Tùy chọn JUSTID chỉ có thể được sử dụng để trả về ID tin nhắn. Điều này rất hữu ích nếu bạn không quan tâm đến chi tiết của tin nhắn và muốn tăng hiệu suất hệ thống.

Quầy giao hàng

Bộ đếm bạn nhìn thấy ở đầu ra ĐANG XUẤT HIỆN là số lần gửi của mỗi tin nhắn. Bộ đếm như vậy được tăng lên theo hai cách: khi một tin nhắn được yêu cầu thành công qua XÁC NHẬN hoặc khi một cuộc gọi được sử dụng XREADGROUP.

Việc một số tin nhắn được gửi đi nhiều lần là điều bình thường. Điều chính là tất cả các tin nhắn cuối cùng đều được xử lý. Đôi khi sự cố xảy ra khi xử lý tin nhắn do bản thân tin nhắn bị hỏng hoặc việc xử lý tin nhắn gây ra lỗi trong mã xử lý. Trong trường hợp này, có thể không ai có thể xử lý tin nhắn này. Vì chúng tôi có bộ đếm số lần giao hàng nên chúng tôi có thể sử dụng bộ đếm này để phát hiện các tình huống như vậy. Do đó, khi số lượng gửi đạt đến con số cao mà bạn chỉ định, có lẽ sẽ khôn ngoan hơn nếu đặt một thông báo như vậy trên một chuỗi khác và gửi thông báo cho quản trị viên hệ thống.

Trạng thái chủ đề

Đội XINFO được sử dụng để yêu cầu thông tin khác nhau về một chủ đề và các nhóm của nó. Ví dụ: một lệnh cơ bản trông như thế này:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Lệnh trên hiển thị thông tin chung về luồng được chỉ định. Bây giờ là một ví dụ phức tạp hơn một chút:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Lệnh trên hiển thị thông tin chung cho tất cả các nhóm của luồng được chỉ định

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Lệnh trên hiển thị thông tin cho tất cả người đăng ký của luồng và nhóm được chỉ định.
Nếu bạn quên cú pháp lệnh, chỉ cần hỏi chính lệnh đó để được trợ giúp:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Giới hạn kích thước luồng

Nhiều ứng dụng không muốn thu thập dữ liệu vào một luồng mãi mãi. Việc giới hạn số lượng tin nhắn tối đa cho mỗi luồng thường rất hữu ích. Trong các trường hợp khác, sẽ rất hữu ích khi di chuyển tất cả các tin nhắn từ một luồng sang một kho lưu trữ liên tục khác khi đạt đến kích thước luồng được chỉ định. Bạn có thể giới hạn kích thước của luồng bằng tham số MAXLEN trong lệnh XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Khi sử dụng MAXLEN, các bản ghi cũ sẽ tự động bị xóa khi chúng đạt đến độ dài được chỉ định, do đó luồng có kích thước không đổi. Tuy nhiên, việc cắt bớt trong trường hợp này không diễn ra theo cách hiệu quả nhất trong bộ nhớ Redis. Bạn có thể cải thiện tình hình như sau:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Đối số ~ trong ví dụ trên có nghĩa là chúng ta không nhất thiết cần giới hạn độ dài luồng ở một giá trị cụ thể. Trong ví dụ của chúng tôi, đây có thể là bất kỳ số nào lớn hơn hoặc bằng 1000 (ví dụ: 1000, 1010 hoặc 1030). Chúng tôi chỉ xác định rõ ràng rằng chúng tôi muốn luồng của mình lưu trữ ít nhất 1000 bản ghi. Điều này giúp việc quản lý bộ nhớ bên trong Redis hiệu quả hơn nhiều.

Ngoài ra còn có một đội riêng XTRIM, thực hiện điều tương tự:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Lưu trữ và sao chép liên tục

Luồng Redis được sao chép không đồng bộ sang các nút nô lệ và được lưu vào các tệp như AOF (ảnh chụp nhanh của tất cả dữ liệu) và RDB (nhật ký của tất cả các hoạt động ghi). Bản sao trạng thái của Nhóm người tiêu dùng cũng được hỗ trợ. Do đó, nếu một thông báo ở trạng thái “đang chờ xử lý” trên nút chính thì trên các nút phụ, thông báo này sẽ có trạng thái tương tự.

Xóa các phần tử riêng lẻ khỏi luồng

Có một lệnh đặc biệt để xóa tin nhắn XDEL. Lệnh lấy tên của chuỗi theo sau là ID tin nhắn cần xóa:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Khi sử dụng lệnh này, bạn cần lưu ý rằng bộ nhớ thực sẽ không được giải phóng ngay lập tức.

Luồng có độ dài bằng không

Sự khác biệt giữa các luồng và các cấu trúc dữ liệu Redis khác là khi các cấu trúc dữ liệu khác không còn các phần tử bên trong chúng nữa, do tác dụng phụ, chính cấu trúc dữ liệu đó sẽ bị xóa khỏi bộ nhớ. Vì vậy, ví dụ, tập hợp đã sắp xếp sẽ bị xóa hoàn toàn khi lệnh gọi ZREM loại bỏ phần tử cuối cùng. Thay vào đó, các luồng được phép lưu lại trong bộ nhớ ngay cả khi không có bất kỳ phần tử nào bên trong.

Kết luận

Redis Stream lý tưởng để tạo các nhà môi giới tin nhắn, hàng đợi tin nhắn, ghi nhật ký hợp nhất và hệ thống trò chuyện lưu giữ lịch sử.

Như tôi đã từng nói Niklaus Wirth, chương trình là thuật toán cộng với cấu trúc dữ liệu và Redis đã cung cấp cho bạn cả hai.

Nguồn: www.habr.com

Thêm một lời nhận xét