[solarseahorse] redis消息隊列插件

Webman Redis Queue 插件
簡介
webman-redis-queue
是為 Webman 框架設計的高效、靈活的 Redis
隊列插件。利用 Redis Stream 的強大特性,該插件專注于提供可靠和高性能的消息隊列解決方案,適合處理大規(guī)模的數(shù)據(jù)流和復雜的隊列操作。
主要特性
- 基于 Redis Stream: 使用 Redis 最新的 Stream 數(shù)據(jù)類型,為消息隊列和事件流提供優(yōu)化的存儲和訪問。
- 自定義異常重試: 支持自定義的消息處理失敗重試機制,提高消息處理的可靠性。
- 死信隊列處理: 集成死信隊列管理,確保消息不會因處理失敗而丟失。
- 延時隊列支持: 實現(xiàn)延時消息處理,使得定時任務和延遲執(zhí)行變得簡單易行。
- 高效的異常處理機制: 強化的異常處理策略,確保隊列的穩(wěn)定運行。
安裝
通過 Composer 安裝 webman-redis-queue
:
composer require solarseahorse/webman-redis-queue:^1.0.1
版本變更記錄
v1.0.1 (20240128)
新增功能
-
刪除延時消息:
新增removeDelayedMessage
方法,允許移除一條延時消息。 -
批量刪除延時消息:
新增removeDelayedMessages
方法,允許一次性移除多個指定的延時消息。 -
檢查延時消息存在性:
新增hasDelayedMessageExists
方法,用于檢查延時消息是否存在。 -
批量檢查延時消息存在性:
新增hasDelayedMessagesExist
方法,用于批量檢查多個延時消息是否存在。
異常處理
- 引入新的異常類型:
為延時消息的移除和存在性檢查操作引入了DelayedMessageRemoveException
和DelayedMessageCheckException
異常類型。
文檔修正
- 修正文檔中的幾處錯誤:
對插件的官方文檔進行了更新,修正了之前版本中存在的一些描述不準確和排版錯誤。
測試和反饋
我們非常歡迎并鼓勵您在測試環(huán)境中嘗試這個插件,并且分享您的使用體驗。您的反饋對我們改進插件、修復潛在的問題以及發(fā)布未來的穩(wěn)定版本非常重要。如果您在使用過程中遇到任何問題或有任何建議,請通過 GitHub Issues
與我聯(lián)系。
參與貢獻
如果您對改進 webman-redis-queue 有興趣,歡迎任何形式的貢獻,包括但不限于:提交問題、提供反饋、或直接向代碼庫提交改進。您的貢獻將幫助我們更快地推出穩(wěn)定、功能豐富的正式版本。
配置
配置文件自動生成在 config/plugin/solarseahorse/webman-redis-queue目錄下。
1. Redis配置 redis.php
<?php
return [
'default' => [
'host' => 'redis://127.0.0.1:6379',
'options' => [
'auth' => null, // 密碼,字符串類型,可選參數(shù)
'db' => 0, // 數(shù)據(jù)庫
'prefix' => 'webman_redis_queue_', // key 前綴
'timeout' => 2, // Timeout
'ping' => 55, // Ping
'reconnect' => true, // 斷線重連
'max_retries' => 5, // 最大重連次數(shù)
'retry_interval' => 5 , // 重連間隔 s
]
],
];
在webman集群下,每個節(jié)點需要連接同一個redis。
斷線重連
注意:開啟此選項能增加隊列運行穩(wěn)定性,但如果隊列進程過多,redis恢復后可能造成突發(fā)大量連接數(shù),因為每個進程都有一個redis連接。
默認開啟,當Redis發(fā)生重載
,重啟
等情況會嘗試重連,超過最大重試次數(shù)后會報錯并重啟進程(webman默認行為)。
2. 日志配置 log.php
推薦為插件配置單獨日志通道,參考鏈接 webman日志
<?php
return [
'enable' => true, // 啟用日志
'handlers' => support\Log::channel('default') // 默認通道 default
];
在隊列消費業(yè)務邏輯中可以這樣使用日志,使用方法和官方的Log
類使用方法一致。
LogUtility::warning('Error:', [
'data' => $consumerMessage->getData(),
'errorMessage' => $e->getMessage()
]);
4. 隊列配置 process.php
- 在加載類名模式下,每個隊列都擁有獨立的運行進程。
- 每個隊列的配置和數(shù)據(jù)存儲KEY都是獨立的。
- 不推薦目錄模式是因為多個隊列共享進程,其中某個隊列出現(xiàn)異??赡苡绊懙狡渌犃?。
- 隊列的詳細配置都在消費類中配置,配置文件只是基本的進程配置。
<?php
return [
'send-email' => [
'handler' => SolarSeahorse\WebmanRedisQueue\Process\ConsumerProcess::class,
'count' => 20, // 在目錄模式中,目錄下所有隊列是共用進程
'constructor' => [
// 支持目錄和類 推薦使用類名
'consumer_source' => \App\queue\test\Email::class
]
]
];
定義消費類
插件對消費類對位置沒有固定要求,符合加載規(guī)范即可。
教程以app/queue/SendEmail.php
舉例,目錄和文件需自行創(chuàng)建。
繼承 SolarSeahorse\WebmanRedisQueue\Consumer
,配置連接標識,并實現(xiàn)抽象方法consume
, 一個最基礎的消費類就創(chuàng)建好了。
<?php
namespace app\queue\test;
use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;
class SendEmail extends Consumer
{
// 連接標識,對應config/plugin/solarseahorse/webman-redis-queue/redis.php的配置
protected string $connection = 'default';
public function consume(ConsumerMessageInterface $consumerMessage)
{
// TODO: Implement consume() method.
}
}
編寫完成后需要在隊列配置文件
process.php
中新增隊列配置。
通過命令行創(chuàng)建
通過 php webman solar:make:consumer
命令可快速創(chuàng)建一個消費類。
示例操作:
webman % php webman solar:make:consumer
Please enter the name of the queue: sendCode
Please enter the number of processes (default 1): 1
Please enter the path to create the class in [app/queue]: app/queue/test
最終將會在 app/queue/test
目錄中創(chuàng)建 SendCode.php
文件。
<?php
namespace app\queue\test;
use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;
class SendCode extends Consumer
{
// 連接標識,對應config/plugin/solarseahorse/webman-redis-queue/redis.php的配置
protected string $connection = 'default';
// 消費
public function consume(ConsumerMessageInterface $consumerMessage)
{
// TODO: Implement consume() method.
// 獲取消息ID
$messageId = $consumerMessage->getMessageId();
// 獲取隊列數(shù)據(jù)
$data = $consumerMessage->getData();
var_dump($messageId);
}
}
隊列配置文件process.php
也會自動更新。
<?php
return array (
'sendCode' =>
array (
'handler' => 'SolarSeahorse\\WebmanRedisQueue\\Process\\ConsumerProcess',
'count' => 1,
'constructor' =>
array (
'consumer_source' => 'app\\queue\\test\\SendCode',
),
),
);
配置屬性
protected string $connection = 'default';
- 連接標識,用于指定 Redis 連接配置。
protected string $queueName = '';
- 隊列名稱,默認自動生成。
protected string $groupName = '';
- 隊列分組名,默認自動生成。
protected string $streamKey = '';
- Stream key,默認自動生成。
protected int $prefetchCount = 1;
- 返回消息的最大數(shù)量。默認為1 不建議修改
- 消費速度可通過提高進程數(shù)并行處理消息,消費者每次讀取多條數(shù)據(jù)是循環(huán)消費,極端情況如循環(huán)消費一半進程重啟會造成大量消息掛起。
protected int $blockTime = 5000;
- 當無消息時堵塞等待的毫秒數(shù),也可作為無消息時的休眠時長。如果隊列以延時隊列為主,應與延時隊列間隔相近。
protected float $consumerTimerInterval = 0.5;
- 消費者處理間隔,消費完一條消息后的等待時間(秒)。
protected int $maxAttempts = 5;
- 消費失敗后的最大重試次數(shù)。
protected int $retrySeconds = 60;
- 重試間隔(秒)。
protected bool $autoAck = true;
- 是否自動確認消息。開啟的同時同樣建議在業(yè)務邏輯中顯式調(diào)用
ack
方法。
protected bool $autoDel = true;
- 是否自動刪除已確認成功的消息。
protected int $delayedQueueOnceHandlerCount = 128;
- 延時隊列每次處理數(shù)量,根據(jù)生產(chǎn)速率適當配置。
protected int $delayedMessagesTimerInterval = 1;
- 延時消息處理間隔(秒)。
protected int $delayedMessagesMaxWorkerCount = 1;
- 延時隊列最大進程數(shù),默認單線程,只會在一個進程開啟延時隊列處理。
protected string $delayedTaskSetKey = '';
- 延時隊列 SET KEY,默認自動生成。
protected string $delayedDataHashKey = '';
- 延時隊列 HASH KEY,默認自動生成。
protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_RETRY;
- 消息掛起超時處理策略。
PENDING_PROCESSING_RETRY
或PENDING_PROCESSING_IGNORE
。 PENDING_PROCESSING_RETRY
當消息掛起超時會進行異常重試。PENDING_PROCESSING_IGNORE
當消息掛起超時時,觸發(fā)死信處理
方便排查錯誤,除此之外只清理pending
列表,不做其他處理。- 默認
PENDING_PROCESSING_RETRY
, 根據(jù)隊列場景選擇合適的處理策略,比如發(fā)送短信驗證碼
,當系統(tǒng)出現(xiàn)了崩潰等情況,恢復上線時,一般情況下這類消息時不需要恢復,此時重新給用戶發(fā)送驗證碼沒有意義,但因為Redis Stream
特性,未ack的消息會在pending
列表中不會丟失,這類場景就適合配置PENDING_PROCESSING_IGNORE
protected int $pendingTimout = 300;
- 消息掛起超時時間(秒)。
- 在Redis Stream中當消息被消費者讀取,但沒有確認(ACK)時,消息會處于掛起狀態(tài)進入
pending
列表。 - 如果消息處理緩慢,此值應盡可能調(diào)大,避免將正常處理的消息當成超時處理掉。
protected int $checkPendingTimerInterval = 60;
- 檢查 pending 列表的間隔時間(秒)。
protected int $onceCheckPendingCount = 50;
- 每次檢查 pending 列表的消息數(shù)量。
投遞消息
通過pushMessage
方法可快速向隊列投遞一條消息。
/**
* @param mixed|QueueMessageInterface $data
* @return string|bool
* @throws QueueMessagePushException
*/
public function pushMessage(string|array|int|QueueMessageInterface $data): string|bool;
// 消息內(nèi)容,無需序列化
$message = [
'dummy' => 'ok'
];
// 生產(chǎn)者工廠方法
$messageId = QueueProducerFactory::create(app\queue\test\SendEmail::class)
->pushMessage($message);
// 通過消費類工廠方法 創(chuàng)建一個生產(chǎn)者
$messageId = app\queue\test\SendEmail::createQueueProducer()->pushMessage($message);
// 投遞QueueMessage對象
$message = app\queue\test\SendEmail::createQueueMessage($message);
// 或者通過QueueMessageFactory創(chuàng)建一條消息
$message = QueueMessageFactory::create(app\queue\test\SendEmail::class,$message);
// 修改隊列數(shù)據(jù)
$message->setData(['dummy' => 'no']);
// 設置錯誤次數(shù)
$message->setFailCount(3);
// 通過上方兩種方法投遞均可
$messageId = app\queue\test\SendEmail::createQueueProducer()->pushMessage($message);
var_export($messageId); // 返回stream的字符串ID 或 false
有時候我們需要一次投遞大量隊列時,可以通過pushMessages
方法,批量投遞消息,此方法會開啟Redis
的pipeline
管道投遞,提高與redis
的交互性能。
/**
* @param array|QueueMessageInterface[] $dataArray
* @return array|false
* @throws QueueMessagePushException
*/
public function pushMessages(array $dataArray): array|bool;
// 投遞5w條消息
$dataArr = array_fill(0, 50000, null);
for ($i = 0; $i < 50000; $i++) {
$dataArr[$i] = ['dummy' => uniqid()];
}
$messageIds = app\queue\test\SendEmail::createQueueProducer()->pushMessages($dataArr);
// QueueMessage方式
for ($i = 0; $i < 50000; $i++) {
$message = QueueMessageFactory::create(app\queue\test\SendEmail::class, ['dummy' => uniqid()]);
//$message->setData(json_encode(['123']));
//$message->setFailCount(1);
// ....
$dataArr[$i] = $message;
}
$messageIds = app\queue\test\SendEmail::createQueueProducer()->pushMessages($dataArr);
var_export($messageIds); // 返回Stream消息ID列表 或 false
數(shù)組投遞實際是通過數(shù)組創(chuàng)建一個
QueueMessage
對象
延時消息
延時消息的作用:
-
定時任務:
延時消息可以用來實現(xiàn)定時任務。例如,你可能想在未來的某個時間點執(zhí)行特定操作,如發(fā)送提醒、更新狀態(tài)等。 -
延遲處理:
在某些情況下,立即處理消息并不理想或可能。延時消息允許應用程序延遲處理,直到最合適的時機。 -
限流:
延時消息可以幫助對系統(tǒng)內(nèi)部的請求進行限流,防止在短時間內(nèi)因大量請求而過載。 -
解耦和異步處理:
在復雜的系統(tǒng)中,延時消息可以用來解耦不同組件間的直接交互,提高系統(tǒng)的可擴展性和維護性。
通過 scheduleDelayedMessage
方法快速投遞一條延時消息。
/**
* @param mixed|QueueMessageInterface $data
* @param int $delay
* @param string $identifier
* @return bool
* @throws ScheduleDelayedMessageException
*/
public function scheduleDelayedMessage(string|array|int|QueueMessageInterface $data, int $delay = 0, string $identifier = ''): bool;
// 消息內(nèi)容
$message = [
'type' => 'warning',
'to' => 'xxxx@email.com',
'content' => '.....'
];
// 投遞一條延時消息 60秒后處理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message, 60);
// QueueMessage對象
$message = app\queue\test\SendEmail::createQueueMessage($message);
// 設置延時
$message->setDelay(60);
// 投遞一條延時消息 60秒后處理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message);
// 使用第二個參數(shù)會替換之前對象的延時設置
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message,80);
如果我們想避免消息被重復發(fā)送等情況,通過延時隊列的特性可以很簡單實現(xiàn)。通過scheduleDelayedMessage
方法的第三個參數(shù)identifier
傳遞一個自定義的延時消息ID,同樣的消息ID,消息將會被替換,延時時間從修改開始重新計算。
如果消息已經(jīng)進入stream隊列將無法實現(xiàn)替換,必須在延時時間內(nèi),類似實現(xiàn)一個“防抖”效果,消息在時間段內(nèi)發(fā)送多次最終只處理一次。
// 消息內(nèi)容
$message = [
'type' => 'warning',
'to' => 'xxxx@email.com',
'content' => '.....'
];
// 通過type,to參數(shù)生成一個唯一ID
$identifier = md5(serialize([
'type' => 'warning',
'to' => 'xxxx@email.com',
]));
// 投遞一條延時消息 60秒后處理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message, 60, $identifier);
// QueueMessage對象
$message = app\queue\test\SendEmail::createQueueMessage($message);
// 設置延時
$message->setDelay(60);
// 設置identifier
$message->setIdentifier($identifier);
// 投遞一條延時消息 60秒后處理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message);
// 傳遞參數(shù)會替換對象之前的延時和ID設置
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message, 80, $identifier);
當一次需要投遞大量延時消息時,可以通過scheduleDelayedMessages
方法發(fā)送。
// 投遞10w條延時消息
$dataArr = array_fill(0, 100000, null);
for ($i = 0; $i < 100000; $i++) {
$dataArr[$i] = [
'delay' => 2, // 延時時間
'data' => ['dummy' => uniqid()], // 隊列數(shù)據(jù)
'identifier' => '' // 自定義ID
];
}
// 批量投遞
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessages($dataArr);
// QueueMessage對象
for ($i = 0; $i < 100000; $i++) {
$message = app\queue\test\SendEmail::createQueueMessage(['dummy' => uniqid()]);
// 設置延時
$message->setDelay(60);
// 設置identifier
$message->setIdentifier('');
$dataArr[$i] = $message;
}
// 批量投遞
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessages($dataArr);
多redis只需要在隊列配置
connection
連接標識,投遞方式?jīng)]有任何變化。
移除延時隊列消息
新功能 (v1.0.1)
以下功能在插件的
v1.0.1
版本中新增。
有時候我們想移除某個或多個延時隊列時,可以使用removeDelayedMessage
和removeDelayedMessages
方法實現(xiàn),使用hasDelayedMessageExists
和hasDelayedMessagesExist
判斷一條或多條延時消息是否存在。
只有任務還存在延時隊列中才能移除,如果已經(jīng)進入
Stream
隊列中將無法移除。
/**
* @param string $identifier
* @return bool
*/
public function removeDelayedMessage(string $identifier): bool;
/**
* @param array $identifiers
* @return bool|array
*/
public function removeDelayedMessages(array $identifiers): array|bool;
/**
* @param string $identifier
* @return bool
*/
public function hasDelayedMessageExists(string $identifier): bool;
/**
* @param array $identifiers
* @return bool|array
*/
public function hasDelayedMessagesExist(array $identifiers): array|bool;
代碼示例:
$consumer = new app\queue\test\SendEmail();
$queueProducer = $consumer::createQueueProducer();
// 添加一條延時消息 通過業(yè)務數(shù)據(jù)生成消息ID
$queueProducer->scheduleDelayedMessage(['dummy' => 'ok'], 60, 'email_user_id');
// 通過QueueMessage對象
$queueMessage = $consumer::createQueueMessage(['dummy' => 'ok']);
$queueMessage->setDelay(60);
// 自定義消息ID 不設置將默認生成 通過getIdentifier()獲取
$queueMessage->setIdentifier('test_id');
// 獲取消息ID
$id = $queueMessage->getIdentifier();
// 判斷消息是否存在
var_export(SendEmail::createQueueProducer()->hasDelayedMessageExists('identifier')); // true or false
// 移除一條延時消息
var_export(SendEmail::createQueueProducer()->removeDelayedMessage('identifier')); // true or false
// 判斷多條消息是否存在 返回一個數(shù)組
var_export(SendEmail::createQueueProducer()->hasDelayedMessagesExist(['identifier1', 'identifier1', 'identifier1']));
//.array (
// 0 => 1706383223.0,
// 1 => 1706383223.0,
// 2 => false
//).
// 移除多條延時消息 返回一個數(shù)組
var_export(SendEmail::createQueueProducer()->removeDelayedMessages(['identifier1', 'identifier1', 'identifier1']));
//
//.array (
// 0 => 1,
// 1 => 1,
// 2 => 0
//).
消費消息
消費消息時會調(diào)用消費類的consume
方法,并傳遞一個實現(xiàn)ConsumerMessageInterface
接口對象。
<?php
namespace app\queue\test;
use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;
class SendEmail extends Consumer
{
// 連接標識,對應redis.php的配置 默認default
protected string $connection = 'default';
public function consume(ConsumerMessageInterface $consumerMessage)
{
// TODO: Implement consume() method.
// 獲取消息ID
$messageId = $consumerMessage->getMessageId();
// 獲取隊列數(shù)據(jù)
$data = $consumerMessage->getData();
// 禁用錯誤重試 如果消費失敗將不會異常重試
$consumerMessage->disableFailRetry();
// 手動觸發(fā)錯誤重試,此方法會調(diào)用disableFailRetry方法,所以后續(xù)報錯不會再觸發(fā)異常重試。
// 沒有禁用錯誤重試的情況下,消費異常默認會調(diào)用此方法。
$consumerMessage->triggerError(new \Exception('triggerError'));
// 監(jiān)聽消費異常事件
$consumerMessage->onError(function (\Throwable $e, ConsumerMessageInterface $consumerMessage) {
// 這里可以處理消費異常邏輯
// 禁用錯誤重試
$consumerMessage->disableFailRetry();
// 添加日志等等
// 如果在消費方法中自行捕獲 Throwable 此事件不會觸發(fā)
});
// 業(yè)務邏輯執(zhí)行完畢,ack確認消息 默認自動ack,但通常建議在業(yè)務邏輯中顯式調(diào)用,比如ack失敗進行事務回滾等等。
$isAcked = $consumerMessage->ack();
if (!$isAcked) {
}
// 或通過getAckStatus方法獲取結(jié)果
if (!$consumerMessage->getAckStatus()) {
}
// 獲取原始隊列消息 QueueMessage對象
$queueMessage = $consumerMessage->getQueueMessage();
// 獲取消息錯誤次數(shù)...
$failCount = $queueMessage->getFailCount();
// 更多...
}
}
上方示例主要演示可調(diào)用的方法,下面使用一個更加貼合實際的demo,更快了解消費業(yè)務邏輯的編寫。
發(fā)送郵件驗證碼
場景特點:獲取驗證碼的操作一般由用戶手動觸發(fā),在這類場景中,錯誤重試應用戶在前端UI倒計時結(jié)束后重新手動發(fā)起,如果業(yè)務出現(xiàn)崩潰,再次上線后重新發(fā)送驗證碼給用戶已經(jīng)沒有意義了。我們可以通過配置適應這類場景,代碼示例:
<?php
namespace app\queue\test;
use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;
class SendEmail extends Consumer
{
// 連接標識,對應redis.php的配置 默認default
protected string $connection = 'default';
// 將pending處理策略調(diào)整為PENDING_PROCESSING_IGNORE 消息掛起超時將不會進行重試
protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_IGNORE;
public function consume(ConsumerMessageInterface $consumerMessage)
{
// TODO: Implement consume() method.
// 獲取消息ID
$messageId = $consumerMessage->getMessageId();
// 獲取隊列數(shù)據(jù)
$data = $consumerMessage->getData();
// 監(jiān)聽異常
$consumerMessage->onError(function (\Throwable $e){
// 記錄郵件發(fā)送失敗日志
});
// 禁用重試
$consumerMessage->disableFailRetry();
// 發(fā)送一封郵件 ....
// 確認消息
$consumerMessage->ack();
}
}
自定義錯誤重試
消費類繼承的抽象類Consumer
默認實現(xiàn)了handlerFailRetry
方法,在觸發(fā)異常重試時,會調(diào)用此方法,如果您想自定義錯誤重試邏輯,或加入更多自定義的處理,在本插件中可以輕松實現(xiàn),并且每個隊列都支持自定義配置。
/**
* 處理錯誤重試
* @param $messageId
* @param ConsumerMessageInterface $consumerMessage
* @param Throwable $e
* @return bool
* @throws ScheduleDelayedMessageException
* @throws RedisException
* @throws Throwable
*/
public function handlerFailRetry($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): bool
{
$queueMessage = $consumerMessage->getQueueMessage();
// 檢查是否超過最大重試次數(shù)
if ($queueMessage->getFailCount() >= $this->maxAttempts) {
// 死信處理
$this->handlerDeadLetterQueue($messageId, $consumerMessage, $e);
return true;
}
$queueMessage->incrementFailCount(); // Fail count + 1
// 計算下次重試時間
$retrySeconds = $queueMessage->getFailCount() * $this->retrySeconds;
// 更新下次重試時間
$queueMessage->updateNextRetry($retrySeconds);
// 設置消息延時
$queueMessage->setDelay($retrySeconds);
// 設置消息ID 避免重復任務
$queueMessage->setIdentifier($messageId);
// 重新發(fā)布至延時隊列
return self::createQueueProducer()->scheduleDelayedMessage($queueMessage);
}
默認實現(xiàn)的代碼如上,我們只需要重寫此方法就可以自定義錯誤處理的業(yè)務邏輯。
代碼示例:
<?php
namespace app\queue\test;
use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;
use Throwable;
class SendEmail extends Consumer
{
// 連接標識,對應redis.php的配置 默認default
protected string $connection = 'default';
// 將pending處理策略調(diào)整為PENDING_PROCESSING_IGNORE 消息掛起超時將不會進行重試
protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_IGNORE;
public function consume(ConsumerMessageInterface $consumerMessage)
{
// TODO: Implement consume() method.
// 獲取消息ID
$messageId = $consumerMessage->getMessageId();
// 獲取隊列數(shù)據(jù)
$data = $consumerMessage->getData();
// 監(jiān)聽異常
$consumerMessage->onError(function (\Throwable $e){
// 記錄郵件發(fā)送失敗日志
});
// 禁用重試
$consumerMessage->disableFailRetry();
// 發(fā)送一封郵件 ....
// 確認消息
$consumerMessage->ack();
}
public function handlerFailRetry($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): bool
{
// 不改動原本的錯誤處理 也可以完全自定義實現(xiàn)。
parent::handlerFailRetry($messageId, $consumerMessage, $e);
// 如果隊列在業(yè)務數(shù)據(jù)庫中還有一個tasks表進行調(diào)度,在這里可以更新task數(shù)據(jù) 比如 錯誤次數(shù)+1
}
}
自定義死信處理
在handlerFailRetry
方法中,默認有這一段:
// 檢查是否超過最大重試次數(shù)
if ($queueMessage->getFailCount() >= $this->maxAttempts) {
// 死信處理
$this->handlerDeadLetterQueue($messageId, $consumerMessage, $e);
return true;
}
那么,我們?nèi)绻枰远x死信處理或加入額外的業(yè)務邏輯可以通過重寫handlerDeadLetterQueue
方法實現(xiàn)。
protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_IGNORE;
當我們設置pending處理策略為PENDING_PROCESSING_IGNORE
時,消息如果掛起超時,將不會觸發(fā)異常重試,而是直接調(diào)用死信處理。默認情況下,死信處理會新增一條日志,方便排查問題。
默認情況下需要配置有效的日志(log.php) 默認行為才有效。也可以通過重寫方法完全自行實現(xiàn),記錄在業(yè)務的數(shù)據(jù)庫中,這也是推薦的做法,可以針對業(yè)務實現(xiàn)更加靈活的異常處理。
/**
* 處理死信 超過最大重試次數(shù)或pending超時PENDING_PROCESSING_IGNORE策略 會調(diào)用此方法
* @param $messageId
* @param ConsumerMessageInterface $consumerMessage
* @param Throwable $e
* @return void
*/
public function handlerDeadLetterQueue($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): void
{
$queueMessage = $consumerMessage->getQueueMessage();
// 添加日志
LogUtility::warning('dead_letter_queue: ', [
'messageId' => $messageId,
'message' => $queueMessage->toArray(),
'failCount' => $queueMessage->getFailCount(),
'errorMsg' => $e->getMessage(),
'trace' => $e->getTraceAsString()
]);
// 更多...
}
代碼示例:
public function handlerDeadLetterQueue($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): void
{
// 保持默認行為
parent::handlerDeadLetterQueue($messageId, $consumerMessage, $e); // TODO: Change the autogenerated stub
// 如果隊列在業(yè)務數(shù)據(jù)庫中還有一個tasks表進行調(diào)度,在這里可以更新task數(shù)據(jù)
}
自定義pending超時處理
抽象類Consumer
中,默認定義了handlerPendingTimeoutMessages
方法,用于處理pending超時的消息。
消費者讀取了一條消息后,消息會進入pending
列表,不會被當前和其他消費者再次讀取,當業(yè)務邏輯沒有執(zhí)行完畢,服務出現(xiàn)掉線,崩潰時,消息并沒有ack
,消息會一直保存在pending
列表中,pending
列表只能通過ack
移除,如果長期不處理,可能造成pending
列表堆積,造成大量內(nèi)存占用,當持續(xù)時間大于$pendingTimout
屬性的時間(默認300秒),會調(diào)用此方法進行處理。
默認情況下,在
PENDING_PROCESSING_IGNORE
策略中,我們認為pending超時消息是死信,不會再次處理,PENDING_PROCESSING_RETRY
會進行異常重試。
/**
* 處理消息掛起超時 當pending列表中有超時未ack的消息會觸發(fā)此方法
* @param string $messageId
* @param ConsumerMessageInterface $consumerMessage
* @param string $consumerName
* @param int $elapsedTime
* @param int $deliveryCount
* @return void
* @throws RedisException
* @throws ScheduleDelayedMessageException
* @throws Throwable
*/
public function handlerPendingTimeoutMessages(string $messageId, ConsumerMessageInterface $consumerMessage, string $consumerName, int $elapsedTime, int $deliveryCount): void
{
switch ($this->getPendingProcessingStrategy()) {
case self::PENDING_PROCESSING_IGNORE: // 忽略pending超時
// 確認消息
$consumerMessage->ack();
// 觸發(fā)死信處理
$this->handlerDeadLetterQueue($messageId, $consumerMessage, new Exception(
'PENDING_PROCESSING_IGNORE: Message pending timeout.'
));
break;
case self::PENDING_PROCESSING_RETRY: // pending超時重試
// 觸發(fā)死信處理
if ($deliveryCount + 1 > $this->getMaxAttempts()) {
// ack消息
$consumerMessage->ack();
$this->handlerDeadLetterQueue(
$messageId,
$consumerMessage,
new Exception(
'PENDING_PROCESSING_RETRY: The number of message delivery times exceeds the maximum number of retries.'
));
return;
}
// 處理重試
$handlerStatus = $this->handlerFailRetry(
$messageId,
$consumerMessage,
new Exception('PENDING_PROCESSING_RETRY: Message pending timeout retry.')
);
if ($handlerStatus) {
$consumerMessage->ack();
}
break;
}
}
獲取隊列的redis連接
有時候我們需要操作或維護隊列時,可以直接獲取隊列的Redis連接進行操作,比如編寫自定義腳本等。
// 獲取隊列的Redis連接
$sendCode = new app\queue\test\SendCode();
$redisConnection = $sendCode->getRedisConnection();
// 使用方法和phpredis擴展一致
$redisConnection->xLen();
$redisConnection->sAdd();
// 在消費類中可以直接使用$this->getRedisConnection();
....更多
命令行
php webman solar:make:consumer
- 創(chuàng)建一個消費者
- 它將引導你創(chuàng)建一個基本的消費者類
php webman solar:remove:consumer
- 移除一個消費者
- 它將引導你移除消費者類
- 注意:它會移除redis中關(guān)于此消費者的所有數(shù)據(jù),如果你只是想移除類和配置,請不要使用此命令。
php webman solar:clean:redis:data
- 清理某個消費者的Redis數(shù)據(jù)
- 它將引導你清理redis數(shù)據(jù)
- 注意:它將刪除redis中關(guān)于此消費者的所有數(shù)據(jù),請謹慎操作。
php webman solar:consumer:list
- 獲取當前全部消費者信息,包含如下信息:
Key
標識Handler
進程類Count
進程數(shù)Consumer
消費者類名Stream Length
當前隊列總長度(不包含Pending列表中的數(shù)量)Delay Set Length
當前延時隊列任務數(shù)Pending List Length
當前Pending列表長度
+-------------+--------------------------------------------------------+-------+--------------------------+---------------+------------------+---------------------+
| Key | Handler | Count | Consumer | Stream Length | Delay Set Length | Pending List Length |
+-------------+--------------------------------------------------------+-------+--------------------------+---------------+------------------+---------------------+
| SendCode | SolarSeahorse\WebmanRedisQueue\Process\ConsumerProcess | 20 | app\queue\test\SendCode | 1996 | 950 | >=500 |
| SendEmail | SolarSeahorse\WebmanRedisQueue\Process\ConsumerProcess | 20 | app\queue\test\SendEmail | 0 | 0 | 0 |
| SendSmsCode | SolarSeahorse\WebmanRedisQueue\Process\ConsumerProcess | 1 | app\queue\SendSmsCode | 0 | 0 | 0 |
+-------------+--------------------------------------------------------+-------+--------------------------+---------------+------------------+---------------------+
處理歷史消息
使用場景:
- 在極端情況下業(yè)務執(zhí)行完畢并且ack成功,但是刪除消息時出現(xiàn)異常,消息保留在
stream
中,一般少量數(shù)據(jù)時我們無需在意,但如果堆積數(shù)量過大可能造成內(nèi)存占用和性能問題。 - 當你需要處理歷史消息,或者重新處理之前已經(jīng)處理過的消息。
- 當你需要對 Stream 中的歷史數(shù)據(jù)進行分析或生成報告。
當
autoDel
屬性為true
時,消息會自動刪除,無法對歷史數(shù)據(jù)進行處理和分析,如果業(yè)務需要對歷史隊列消息進行回溯請設置為false
代碼示例:
這里我們使用了webman
中自定義腳本
的編寫,可以將腳本加入定時任務中,定期清理或處理歷史消息。
下方代碼只是示例,請確保在測試環(huán)境充分測試。
<?php
use SolarSeahorse\WebmanRedisQueue\Queue\QueueMessage;
require_once __DIR__ . '/../vendor/autoload.php';
require_once __DIR__ . '/../support/bootstrap.php';
// 獲取隊列的Redis連接
$sendCode = new app\queue\test\SendCode();
$redisConnection = $sendCode->getRedisConnection();
// 使用方法和phpredis擴展一致
$streamKey = $sendCode->getStreamKey();
$start = '-'; // 表示從 Stream 的最開始讀取
$end = '+'; // 表示讀取到 Stream 的最末尾
$count = 100; // 指定要讀取的消息數(shù)量
// 讀取Stream列表,不包括pending
$messages = $redisConnection->xRange($streamKey, $start, $end, $count);
$deleteMessageIds = [];
foreach ($messages as $messageId => $message) {
// 解析原始消息內(nèi)容
$messageArr = QueueMessage::parseRawMessage($message);
if (!$messageArr) { // 未知消息
$deleteMessageIds[] = $messageId;
continue;
}
// 轉(zhuǎn)換為QueueMessage方便操作
$queueMessage = QueueMessage::createFromArray($messageArr);
// 通過獲取消息時間戳,如果消息已經(jīng)存在超過1個小時 標記刪除。
if (time() - $queueMessage->getTimestamp() > 3600) {
$deleteMessageIds[] = $messageId;
}
}
// 批量刪除消息
$redisConnection->xDel($streamKey, $deleteMessageIds);
在其他項目投遞消息
目前插件沒有實現(xiàn)在其他項目投遞的標準實現(xiàn),可通過業(yè)務需求開發(fā)隊列提交接口實現(xiàn)。