【workbunny】Redis-Stream隊列

計劃
2.2 版本已發(fā)布
- 新增 自適應定時器、自適應Builder
- 自適應定時器/自適應Builder:以 時間間隔 啟動,滿足 閑置時長 后以 退避指數 x 時間間隔 進行定時間隔,最大不超過 最大間隔;
- 修復了一些已知bug
- 兼容workerman 5.0
文檔
https://github.com/workbunny/webman-rqueue/blob/master/README.md
常見問題
-
什么時候使用消息隊列?
當你需要對系統(tǒng)進行解耦、削峰、異步的時候;如發(fā)送短信驗證碼、秒殺活動、資產的異步分賬清算等。
-
RabbitMQ和Redis的區(qū)別?
Redis中的Stream的特性同樣適用于消息隊列,并且也包含了比較完善的ACK機制,但在一些點上與RabbitMQ存在不同:
- Redis Stream沒有完善的后臺管理;RabbitMQ擁有較為完善的后臺管理及Api;
- Redis的持久化策略取舍:默認的RDB策略極端情況下存在丟失數據,AOF策略則需要犧牲一些性能;RabbitMQ持久化方案更多,可對消息持久化也可對隊列持久化;
- RabbitMQ擁有更多的插件可以提供更完善的協議支持及功能支持;
-
什么時候使用Redis?什么時候使用RabbitMQ?
當你的隊列使用比較單一或者比較輕量的時候,請選用 Redis Stream;當你需要一個比較完整的消息隊列體系,包括需要利用交換機來綁定不同隊列做一些比較復雜的消息任務的時候,請選擇RabbitMQ;
當然,如果你的隊列使用也比較單一,但你需要用到一些管理后臺相關系統(tǒng)化的功能的時候,又不想花費太多時間去開發(fā)的時候,也可以使用RabbitMQ;因為RabbitMQ提供了一整套后臺管理的體系及 HTTP API 供開發(fā)者兼容到自己的管理后臺中,不需要再消耗多余的時間去開發(fā)功能;
注:這里的 輕量 指的是 無須將應用中的隊列服務獨立化,該隊列服務是該應用獨享的
簡介
- 基于Redis Stream的輕量級隊列;
- Queue 模式:多個消費者競爭消費
- 支持普通消費
- 支持延遲消費
- Group 模式:多個消費組訂閱消費
- 支持普通消費
- 支持延遲消費
- Adaptive 模式:Queue的自適應版消費者
- 支持普通消費
- 支持延遲消費
- 可靠的重載機制,防止消息意外丟失/重復消費
- 使用本地sqlite庫儲存因意外中斷的消息
- 自動加載本地消息至隊列
- 簡單易用容易理解的使用方式
- 豐富的命令行助手,助手函數
- BuilderClass繼承模式(類似ORM的ModelClass)
- 支持自定義BuilderClass,實現自定義的消費邏輯
安裝
環(huán)境依賴
- php >=8.0
- webman >= 1.0
- redis >= 6.2
composer require workbunny/webman-rqueue
使用
QueueBuilder
- 一個QueueBuilder類對應一個消費Group和一個消費邏輯
QueueBuilder::handler()
- 一個QueueBuilder可對應一個/多個Redis-Stream-Key,通過配置
QueueBuilder::$config['queues']
- QueueBuilder類使用定時器進行消費,每一次消費之后會根據消息的屬性
_header['_delete']
來進行消息釋放
命令行
--mode默認為queue
- 創(chuàng)建
# 創(chuàng)建一個擁有單進程消費者的QueueBuilder
./webman workbunny:rqueue-builder testQueue --mode=queue
./webman workbunny:rqueue-builder testQueue -m queue
# 創(chuàng)建一個擁有4進程消費者的QueueBuilder
./webman workbunny:rqueue-builder testQueue 4 --mode=queue
./webman workbunny:rqueue-builder testQueue 4 -m queue
# 創(chuàng)建一個擁有單進程消費者的延遲QueueBuilder
./webman workbunny:rqueue-builder testQueue --delayed --mode=queue
./webman workbunny:rqueue-builder testQueue -dm queue
# 創(chuàng)建一個擁有4進程消費者的延遲QueueBuilder
./webman workbunny:rqueue-builder testQueue 4 --delayed --mode=queue
./webman workbunny:rqueue-builder testQueue 4 -dm queue
# 在 process/workbunny/rqueue 目錄下創(chuàng)建 TestQueueBuilder.php
./webman workbunny:rqueue-builder testQueue
# 在 process/workbunny/rqueue/project 目錄下創(chuàng)建 TestQueueBuilder.php
./webman workbunny:rqueue-builder project/testQueue
# 在 process/workbunny/rqueue/project 目錄下創(chuàng)建 TestAllQueueBuilder.php
./webman workbunny:rqueue-builder project/testAllQueue
# 延遲同理
- 移除
移除包含了類文件的移除和配置的移除
# 移除Builder
./webman workbunny:rqueue-remove testQueue
# 移除延遲Builder
./webman workbunny:rqueue-remove testQueue --delayed
./webman workbunny:rqueue-remove testQueue -d
# 二級菜單同理
- 開啟
開啟僅對配置進行移除
# 開啟Builder
./webman workbunny:rqueue-builder test --open --mode=queue
./webman workbunny:rqueue-builder test -om queue
# 開啟延遲Builder
./webman workbunny:rqueue-builder test --open --delayed --mode=queue
./webman workbunny:rqueue-builder test -odm queue
# 二級菜單同理
- 關閉
關閉僅對配置進行移除
# 關閉Builder
./webman workbunny:rqueue-remove test --close --mode=queue
./webman workbunny:rqueue-remove test -cm queue
# 關閉延遲Builder
./webman workbunny:rqueue-remove test --close --delayed --mode=queue
./webman workbunny:rqueue-remove test -cdm queue
# 二級菜單同理
GroupBuilder
- 一個GroupBuilder類對應一個消費Group和一個消費邏輯
QueueBuilder::handler()
- 一個GroupBuilder可對應一個/多個Redis-Stream-Key,通過配置
QueueBuilder::$config['queues']
- GroupBuilder類使用定時器進行消費,使用定時器釋放當前 Stream-Key 上所有Group收取過的閑置消息
- 可以使用多個GroupBuilder類配置相同的
QueueBuilder::$config['queues']
,從而達到一條/多條隊列由不同的消費邏輯進行處理;- 基于此特性,可以實現消息持久化的發(fā)布訂閱
- 基于此特性,可以實現RabbitMQ的exchange模式
命令行
- 創(chuàng)建
# 創(chuàng)建一個擁有單進程消費者的GroupBuilder
./webman workbunny:rqueue-builder testGroup --mode=group
./webman workbunny:rqueue-builder testGroup -m group
# 創(chuàng)建一個擁有4進程消費者的GroupBuilder
./webman workbunny:rqueue-builder testGroup 4 --mode=group
./webman workbunny:rqueue-builder testGroup 4 -m group
# 創(chuàng)建一個擁有單進程消費者的延遲GroupBuilder
./webman workbunny:rqueue-builder testGroup --delayed--mode=group
./webman workbunny:rqueue-builder testGroup -dm group
# 創(chuàng)建一個擁有4進程消費者的延遲GroupBuilder
./webman workbunny:rqueue-builder testGroup 4 --delayed--mode=group
./webman workbunny:rqueue-builder testGroup 4 -dm group
# 二級菜單
# 在 process/workbunny/rqueue 目錄下創(chuàng)建 TestGroupBuilder.php
./webman workbunny:rqueue-builder testGroup --mode=group
./webman workbunny:rqueue-builder testGroup -m group
# 在 process/workbunny/rqueue/project 目錄下創(chuàng)建 TestGroupBuilder.php
./webman workbunny:rqueue-builder project/testGroup --mode=group
./webman workbunny:rqueue-builder project/testGroup -m group
# 在 process/workbunny/rqueue/project 目錄下創(chuàng)建 TestAllGroupBuilder.php
./webman workbunny:rqueue-builder project/testAllGroup --mode=group
./webman workbunny:rqueue-builder project/testAllGroup -m group
- 移除
移除包含了類文件的移除和配置的移除
# 移除Builder
./webman workbunny:rqueue-remove testGroup --mode=group
./webman workbunny:rqueue-remove testGroup -m group
# 移除延遲Builder
./webman workbunny:rqueue-remove testGroup --delayed --mode=group
./webman workbunny:rqueue-remove testGroup -dm group
# 二級菜單同理
- 開啟
開啟僅對配置進行移除
# 開啟Builder
./webman workbunny:rqueue-builder testGroup --open --mode=group
./webman workbunny:rqueue-builder testGroup -om group
# 開啟延遲Builder
./webman workbunny:rqueue-builder testGroup --open --delayed --mode=group
./webman workbunny:rqueue-builder testGroup -odm group
# 二級菜單同理
- 關閉
關閉僅對配置進行移除
# 關閉Builder
./webman workbunny:rqueue-remove testGroup --close --mode=group
./webman workbunny:rqueue-remove testGroup -cm group
# 關閉延遲Builder
./webman workbunny:rqueue-remove testGroup --close --delayed --mode=group
./webman workbunny:rqueue-remove testGroup -cdm group
# 二級菜單同理
AdaptiveBuilder
說明
- AdaptiveBuilder與QueueBuilder的消費方式一致
- AdaptiveBuilder底層定時器會根據閑置閾值進行判斷,消費定時器根據退避指數 x 當前消費間隔進行重置消費間隔
相較于其他Builder的優(yōu)勢
- 在消息負載較高的情況下,AdaptiveBuilder是普通的QueueBuilder
- 在消息負載較低的情況下,AdaptiveBuilder根據閑置閾值對消費者的消費查詢速率進行自適應退避調整,有效減少redis的查詢壓力
- 在延時消費場景下,AdaptiveBuilder能有效減少因頻繁查詢redis而造成的redis-server CPU占用率較高的問題
- 在普通消費模式下,AdaptiveBuilder相比于其他Builder能更快啟動下一個消費周期,無需等待Timer的下一個loop,消費更及時
命令行
- 創(chuàng)建
# 創(chuàng)建一個擁有單進程消費者的AdaptiveBuilder
./webman workbunny:rqueue-builder testAdaptive --mode=adaptive
./webman workbunny:rqueue-builder testAdaptive -m adaptive
# 創(chuàng)建一個擁有4進程消費者的AdaptiveBuilder
./webman workbunny:rqueue-builder testAdaptive 4 --mode=adaptive
./webman workbunny:rqueue-builder testAdaptive 4 -m adaptive
# 創(chuàng)建一個擁有單進程消費者的延遲AdaptiveBuilder
./webman workbunny:rqueue-builder testAdaptive --delayed--mode=adaptive
./webman workbunny:rqueue-builder testAdaptive -dm adaptive
# 創(chuàng)建一個擁有4進程消費者的延遲AdaptiveBuilder
./webman workbunny:rqueue-builder testAdaptive 4 --delayed--mode=adaptive
./webman workbunny:rqueue-builder testAdaptive 4 -dm adaptive
# 二級菜單
# 在 process/workbunny/rqueue 目錄下創(chuàng)建
./webman workbunny:rqueue-builder testAdaptive --mode=adaptive
./webman workbunny:rqueue-builder testAdaptive -m adaptive
# 在 process/workbunny/rqueue/project 目錄下創(chuàng)建
./webman workbunny:rqueue-builder project/testAdaptive --mode=adaptive
./webman workbunny:rqueue-builder project/testAdaptive -m adaptive
# 在 process/workbunny/rqueue/project 目錄下創(chuàng)建
./webman workbunny:rqueue-builder project/testAllAdaptive --mode=adaptive
./webman workbunny:rqueue-builder project/testAllAdaptive -m adaptive
- 移除
移除包含了類文件的移除和配置的移除
# 移除Builder
./webman workbunny:rqueue-remove testAdaptive --mode=adaptive
./webman workbunny:rqueue-remove testAdaptive -m adaptive
# 移除延遲Builder
./webman workbunny:rqueue-remove testAdaptive --delayed --mode=adaptive
./webman workbunny:rqueue-remove testAdaptive -dm adaptive
# 二級菜單同理
- 開啟
開啟僅對配置進行移除
# 開啟Builder
./webman workbunny:rqueue-builder testAdaptive --open --mode=adaptive
./webman workbunny:rqueue-builder testAdaptive -om adaptive
# 開啟延遲Builder
./webman workbunny:rqueue-builder testAdaptive --open --delayed --mode=adaptive
./webman workbunny:rqueue-builder testAdaptive -odm adaptive
# 二級菜單同理
- 關閉
關閉僅對配置進行移除
# 關閉Builder
./webman workbunny:rqueue-remove testAdaptive --close --mode=adaptive
./webman workbunny:rqueue-remove testAdaptive -cm adaptive
# 關閉延遲Builder
./webman workbunny:rqueue-remove testAdaptive --close --delayed --mode=adaptive
./webman workbunny:rqueue-remove testAdaptive -cdm adaptive
# 二級菜單同理
自定義Builder
如queue/group Builder都不滿足需求,您可繼承 AbstractBuilder 自行實現您所需要的Builder
-
您的Builder基類需要繼承AbstractBuilder實現,可參考QueueBuilder/GroupBuilder
- onWorkerStart 用于進程啟動時的觸發(fā)邏輯,
這里一般使用Timer結合讀取隊列觸發(fā)callback來實現消費隊列 - onWorkerStop 用于進程停止時的回收動作
- onWorkerReload 用于進程重載時的觸發(fā)動作,除非有特殊處理,通常置空
/* * Builder 啟動時 * * @param Worker $worker * @return void */ abstract public function onWorkerStart(Worker $worker): void;
/**
- Builder 停止時
- @param Worker $worker
- @return void
*/
abstract public function onWorkerStop(Worker $worker): void;
/**
- Builder 重加載時
- @param Worker $worker
- @return void
*/
abstract public function onWorkerReload(Worker $worker): void;
- onWorkerStart 用于進程啟動時的觸發(fā)邏輯,
- classContent方法是配合命令行,用于自動生成隊列文件,如不使用,可置空
/** * Command 獲取需要創(chuàng)建的類文件內容 * * @param string $namespace * @param string $className * @param bool $isDelay * @return string */ abstract public static function classContent(string $namespace, string $className, bool $isDelay): string;
- Traits類提供所需的基礎方法,按需在您的Builder基類中引用
- MessageQueueMethod 提供針對redis-stream隊列的基礎操作,如ack、publish、consume等、
- <a id='temp'>MessageTempMethod 提供本地數據緩存,用于對異常數據的收集和requeue</a>
- 如上述traits無法滿足需求,可自定義Traits
注意
- QueueBuilder與GroupBuilder在命令行自動生成時沒有做類似Delayed的區(qū)分,用戶可自行進行命名區(qū)分,如:
# 創(chuàng)建一個GroupBuilder
./webman workbunny:rqueue-builder testGroup --mode=group
# 創(chuàng)建一個QueueBuilder
./webman workbunny:rqueue-builder testQueue --mode=queue
-
創(chuàng)建的Builder類可以手動修改調整
-
為Builder添加進process.php的配置可以手動修改
查看Builder
./webman workbunny:rqueue-list
注:當 Builder 未啟動時,handler 與 count 顯示為 --
+----------+-----------------------------------------------------------------------+-------------------------------------------------+-------+-------+
| name | file | handler | count | mode |
+----------+-----------------------------------------------------------------------+-------------------------------------------------+-------+-------+
| test | /var/www/your-project/process/workbunny/rqueue/TestBuilder.php | process\workbunny\rqueue\TestBuilder | 1 | queue |
| test -d | /var/www/your-project/process/workbunny/rqueue/TestBuilderDelayed.php | process\workbunny\rqueue\TestBuilderDelayed | 1 | group |
+----------+-----------------------------------------------------------------------+-------------------------------------------------+-------+-------+
生產
發(fā)布普通消息
注:向普通隊列發(fā)布延遲消息會拋出一個 WebmanRqueueException 異常
use function Workbunny\WebmanRqueue\sync_publish;
use function Workbunny\WebmanRqueue\sync_publish_get_ids;
use process\workbunny\rqueue\TestBuilder;
# 使用函數發(fā)布,返回受影響條數,多隊列不具備事務一致
/** headers參數詳見 @link Header */
sync_publish(TestBuilder::instance(), 'abc', [
'_delete' => false
]);
# 使用對象發(fā)布,返回受影響條數,多隊列不具備事務一致
/** headers參數詳見 @link Header */
TestBuilder::instance()->publish('abc', [
'_delete' => false
]);
# 返回消息ID組(數組),多隊列不具備事務一致
sync_publish_get_ids(TestBuilder::instance(), 'abc', [
'_delete' => false
]);
# 返回消息ID組(數組),多隊列不具備事務一致
TestBuilder::instance()->publishGetIds('abc', [
'_delete' => false
]);
發(fā)布延遲消息
注:向延遲隊列發(fā)布普通消息會拋出一個 WebmanRqueueException 異常
注:延遲隊列發(fā)布消息不支持指定消息id
use function Workbunny\WebmanRqueue\sync_publish;
use function Workbunny\WebmanRqueue\sync_publish_get_ids;
use process\workbunny\rqueue\TestBuilder;
# 延遲10ms,返回受影響條數,多隊列不具備事務一致
sync_publish(TestBuilder::instance(), 'abc', [
'_delay' => 10
]);
# 延遲10ms,返回受影響條數,多隊列不具備事務一致
TestBuilder::instance()->publish('abc', [
'_delay' => 10
]);
# 延遲10ms,返回消息ID組(數組),多隊列不具備事務一致
sync_publish_get_ids(TestBuilder::instance(), 'abc', [
'_delay' => 10
]);
# 延遲10ms,返回消息ID組(數組),多隊列不具備事務一致
TestBuilder::instance()->publishGetIds('abc', [
'_delay' => 10
]);
說明
-
生產可用,歡迎 issue 和 PR;
-
Redis Stream 本身沒有 delayed 或 non-delayed 之分,組件代碼將它們區(qū)分的原因是不希望 delayed 被濫用;開發(fā)者應該明確哪些消息是延遲的、哪些是立即的,并且明確體現,也方便維護,因為延遲消息過多會導致消息堆積,從而占用Redis過多的資源;
-
延遲隊列是通過對消息的不斷讀取放回并且加以判斷是否達到延遲觸發(fā)時間來進行模擬延遲觸發(fā)的效果,消息不支持指定id,請在邏輯內自行實現;
-
Redis Stream 的持久化依賴 Redis 本身的持久化策略,在一定情況下 Redis Stream 也并非是可靠型的消息隊列;關于持久化相關內容,請仔細閱讀 Redis中文文檔;
-
本地重載機制使用了SQLite3,詳見 src/Builders/Traits/MessageTempMethod