国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

[solarseahorse] redis消息隊列插件

v1.0.1 版本
2024-01-27 版本更新時間
379 安裝
6 star

Webman Redis Queue 插件

簡介

webman-redis-queue 是為 Webman 框架設計的高效、靈活的 Redis
隊列插件。利用 Redis Stream 的強大特性,該插件專注于提供可靠和高性能的消息隊列解決方案,適合處理大規(guī)模的數(shù)據(jù)流和復雜的隊列操作。

主要特性

  • 基于 Redis Stream: 使用 Redis 最新的 Stream 數(shù)據(jù)類型,為消息隊列和事件流提供優(yōu)化的存儲和訪問。
  • 自定義異常重試: 支持自定義的消息處理失敗重試機制,提高消息處理的可靠性。
  • 死信隊列處理: 集成死信隊列管理,確保消息不會因處理失敗而丟失。
  • 延時隊列支持: 實現(xiàn)延時消息處理,使得定時任務和延遲執(zhí)行變得簡單易行。
  • 高效的異常處理機制: 強化的異常處理策略,確保隊列的穩(wěn)定運行。

安裝

通過 Composer 安裝 webman-redis-queue

composer require solarseahorse/webman-redis-queue:^1.0.1

版本變更記錄

v1.0.1 (20240128)

新增功能

  • 刪除延時消息
    新增 removeDelayedMessage 方法,允許移除一條延時消息。

  • 批量刪除延時消息
    新增 removeDelayedMessages 方法,允許一次性移除多個指定的延時消息。

  • 檢查延時消息存在性
    新增 hasDelayedMessageExists 方法,用于檢查延時消息是否存在。

  • 批量檢查延時消息存在性
    新增 hasDelayedMessagesExist 方法,用于批量檢查多個延時消息是否存在。

異常處理

  • 引入新的異常類型
    為延時消息的移除和存在性檢查操作引入了 DelayedMessageRemoveExceptionDelayedMessageCheckException 異常類型。

文檔修正

  • 修正文檔中的幾處錯誤
    對插件的官方文檔進行了更新,修正了之前版本中存在的一些描述不準確和排版錯誤。

測試和反饋

我們非常歡迎并鼓勵您在測試環(huán)境中嘗試這個插件,并且分享您的使用體驗。您的反饋對我們改進插件、修復潛在的問題以及發(fā)布未來的穩(wěn)定版本非常重要。如果您在使用過程中遇到任何問題或有任何建議,請通過 GitHub Issues
與我聯(lián)系。

參與貢獻

如果您對改進 webman-redis-queue 有興趣,歡迎任何形式的貢獻,包括但不限于:提交問題、提供反饋、或直接向代碼庫提交改進。您的貢獻將幫助我們更快地推出穩(wěn)定、功能豐富的正式版本。

配置

配置文件自動生成在 config/plugin/solarseahorse/webman-redis-queue目錄下。

1. Redis配置 redis.php

<?php
return [
    'default' => [
        'host' => 'redis://127.0.0.1:6379',
        'options' => [
            'auth' => null,       // 密碼,字符串類型,可選參數(shù)
            'db' => 0,            // 數(shù)據(jù)庫
            'prefix' => 'webman_redis_queue_',      // key 前綴
            'timeout' => 2, // Timeout
            'ping' => 55,             // Ping
            'reconnect' => true,  // 斷線重連
            'max_retries' => 5, // 最大重連次數(shù)
            'retry_interval' => 5 , // 重連間隔 s
        ]
    ],
];

在webman集群下,每個節(jié)點需要連接同一個redis。

斷線重連

注意:開啟此選項能增加隊列運行穩(wěn)定性,但如果隊列進程過多,redis恢復后可能造成突發(fā)大量連接數(shù),因為每個進程都有一個redis連接。

默認開啟,當Redis發(fā)生重載,重啟等情況會嘗試重連,超過最大重試次數(shù)后會報錯并重啟進程(webman默認行為)。

2. 日志配置 log.php

推薦為插件配置單獨日志通道,參考鏈接 webman日志


<?php

return [
    'enable' => true, // 啟用日志
    'handlers' => support\Log::channel('default') // 默認通道 default
];

在隊列消費業(yè)務邏輯中可以這樣使用日志,使用方法和官方的Log類使用方法一致。


LogUtility::warning('Error:', [
      'data' => $consumerMessage->getData(),
      'errorMessage' => $e->getMessage()
]);

4. 隊列配置 process.php

  1. 在加載類名模式下,每個隊列都擁有獨立的運行進程。
  2. 每個隊列的配置和數(shù)據(jù)存儲KEY都是獨立的。
  3. 不推薦目錄模式是因為多個隊列共享進程,其中某個隊列出現(xiàn)異??赡苡绊懙狡渌犃?。
  4. 隊列的詳細配置都在消費類中配置,配置文件只是基本的進程配置。
<?php

return [
    'send-email' => [
        'handler' => SolarSeahorse\WebmanRedisQueue\Process\ConsumerProcess::class,
        'count' => 20, // 在目錄模式中,目錄下所有隊列是共用進程
        'constructor' => [
            // 支持目錄和類 推薦使用類名
            'consumer_source' => \App\queue\test\Email::class
        ]
    ]
];

定義消費類

插件對消費類對位置沒有固定要求,符合加載規(guī)范即可。

教程以app/queue/SendEmail.php舉例,目錄和文件需自行創(chuàng)建。

繼承 SolarSeahorse\WebmanRedisQueue\Consumer,配置連接標識,并實現(xiàn)抽象方法consume, 一個最基礎的消費類就創(chuàng)建好了。

<?php

namespace app\queue\test;

use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;

class SendEmail extends Consumer
{
    // 連接標識,對應config/plugin/solarseahorse/webman-redis-queue/redis.php的配置
    protected string $connection = 'default';

    public function consume(ConsumerMessageInterface $consumerMessage)
    {
        // TODO: Implement consume() method.
    }
}

編寫完成后需要在隊列配置文件process.php中新增隊列配置。

通過命令行創(chuàng)建

通過 php webman solar:make:consumer 命令可快速創(chuàng)建一個消費類。

示例操作:

webman % php webman solar:make:consumer

Please enter the name of the queue: sendCode

Please enter the number of processes (default 1): 1

Please enter the path to create the class in [app/queue]: app/queue/test

最終將會在 app/queue/test 目錄中創(chuàng)建 SendCode.php 文件。

<?php

namespace app\queue\test;

use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;

class SendCode extends Consumer
{
    // 連接標識,對應config/plugin/solarseahorse/webman-redis-queue/redis.php的配置
    protected string $connection = 'default';

    // 消費
    public function consume(ConsumerMessageInterface $consumerMessage)
    {
        // TODO: Implement consume() method.

        // 獲取消息ID
        $messageId = $consumerMessage->getMessageId();

        // 獲取隊列數(shù)據(jù)
        $data = $consumerMessage->getData();

        var_dump($messageId);
    }
}

隊列配置文件process.php也會自動更新。

<?php

return array (
  'sendCode' => 
  array (
    'handler' => 'SolarSeahorse\\WebmanRedisQueue\\Process\\ConsumerProcess',
    'count' => 1,
    'constructor' => 
    array (
      'consumer_source' => 'app\\queue\\test\\SendCode',
    ),
  ),
);

配置屬性

protected string $connection = 'default';

  • 連接標識,用于指定 Redis 連接配置。

protected string $queueName = '';

  • 隊列名稱,默認自動生成。

protected string $groupName = '';

  • 隊列分組名,默認自動生成。

protected string $streamKey = '';

  • Stream key,默認自動生成。

protected int $prefetchCount = 1;

  • 返回消息的最大數(shù)量。默認為1 不建議修改
  • 消費速度可通過提高進程數(shù)并行處理消息,消費者每次讀取多條數(shù)據(jù)是循環(huán)消費,極端情況如循環(huán)消費一半進程重啟會造成大量消息掛起。

protected int $blockTime = 5000;

  • 當無消息時堵塞等待的毫秒數(shù),也可作為無消息時的休眠時長。如果隊列以延時隊列為主,應與延時隊列間隔相近。

protected float $consumerTimerInterval = 0.5;

  • 消費者處理間隔,消費完一條消息后的等待時間(秒)。

protected int $maxAttempts = 5;

  • 消費失敗后的最大重試次數(shù)。

protected int $retrySeconds = 60;

  • 重試間隔(秒)。

protected bool $autoAck = true;

  • 是否自動確認消息。開啟的同時同樣建議在業(yè)務邏輯中顯式調(diào)用 ack方法。

protected bool $autoDel = true;

  • 是否自動刪除已確認成功的消息。

protected int $delayedQueueOnceHandlerCount = 128;

  • 延時隊列每次處理數(shù)量,根據(jù)生產(chǎn)速率適當配置。

protected int $delayedMessagesTimerInterval = 1;

  • 延時消息處理間隔(秒)。

protected int $delayedMessagesMaxWorkerCount = 1;

  • 延時隊列最大進程數(shù),默認單線程,只會在一個進程開啟延時隊列處理。

protected string $delayedTaskSetKey = '';

  • 延時隊列 SET KEY,默認自動生成。

protected string $delayedDataHashKey = '';

  • 延時隊列 HASH KEY,默認自動生成。

protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_RETRY;

  • 消息掛起超時處理策略。PENDING_PROCESSING_RETRYPENDING_PROCESSING_IGNORE
  • PENDING_PROCESSING_RETRY 當消息掛起超時會進行異常重試。
  • PENDING_PROCESSING_IGNORE 當消息掛起超時時,觸發(fā)死信處理方便排查錯誤,除此之外只清理pending列表,不做其他處理。
  • 默認 PENDING_PROCESSING_RETRY , 根據(jù)隊列場景選擇合適的處理策略,比如發(fā)送短信驗證碼
    ,當系統(tǒng)出現(xiàn)了崩潰等情況,恢復上線時,一般情況下這類消息時不需要恢復,此時重新給用戶發(fā)送驗證碼沒有意義,但因為Redis Stream特性,未ack的消息會在pending列表中不會丟失,這類場景就適合配置PENDING_PROCESSING_IGNORE

protected int $pendingTimout = 300;

  • 消息掛起超時時間(秒)。
  • 在Redis Stream中當消息被消費者讀取,但沒有確認(ACK)時,消息會處于掛起狀態(tài)進入pending列表。
  • 如果消息處理緩慢,此值應盡可能調(diào)大,避免將正常處理的消息當成超時處理掉。

protected int $checkPendingTimerInterval = 60;

  • 檢查 pending 列表的間隔時間(秒)。

protected int $onceCheckPendingCount = 50;

  • 每次檢查 pending 列表的消息數(shù)量。

投遞消息

通過pushMessage方法可快速向隊列投遞一條消息。


/**
 * @param mixed|QueueMessageInterface $data
 * @return string|bool
 * @throws QueueMessagePushException
 */

 public function pushMessage(string|array|int|QueueMessageInterface $data): string|bool;

// 消息內(nèi)容,無需序列化
$message = [
    'dummy' => 'ok'
];

// 生產(chǎn)者工廠方法
$messageId = QueueProducerFactory::create(app\queue\test\SendEmail::class)
    ->pushMessage($message);

// 通過消費類工廠方法 創(chuàng)建一個生產(chǎn)者
$messageId = app\queue\test\SendEmail::createQueueProducer()->pushMessage($message);

// 投遞QueueMessage對象
$message = app\queue\test\SendEmail::createQueueMessage($message);

// 或者通過QueueMessageFactory創(chuàng)建一條消息
$message = QueueMessageFactory::create(app\queue\test\SendEmail::class,$message);

// 修改隊列數(shù)據(jù)
$message->setData(['dummy' => 'no']);

// 設置錯誤次數(shù)
$message->setFailCount(3);

// 通過上方兩種方法投遞均可
$messageId = app\queue\test\SendEmail::createQueueProducer()->pushMessage($message);

var_export($messageId); // 返回stream的字符串ID 或 false

有時候我們需要一次投遞大量隊列時,可以通過pushMessages方法,批量投遞消息,此方法會開啟Redispipeline
管道投遞,提高與redis的交互性能。


/**
 * @param array|QueueMessageInterface[] $dataArray
 * @return array|false
 * @throws QueueMessagePushException
 */

 public function pushMessages(array $dataArray): array|bool;

// 投遞5w條消息
$dataArr = array_fill(0, 50000, null);

for ($i = 0; $i < 50000; $i++) {
    $dataArr[$i] = ['dummy' => uniqid()];
}

$messageIds = app\queue\test\SendEmail::createQueueProducer()->pushMessages($dataArr);

// QueueMessage方式

for ($i = 0; $i < 50000; $i++) {

    $message = QueueMessageFactory::create(app\queue\test\SendEmail::class, ['dummy' => uniqid()]);

    //$message->setData(json_encode(['123']));
    //$message->setFailCount(1);
    // ....

    $dataArr[$i] = $message;
}

$messageIds = app\queue\test\SendEmail::createQueueProducer()->pushMessages($dataArr);

var_export($messageIds); // 返回Stream消息ID列表 或 false

數(shù)組投遞實際是通過數(shù)組創(chuàng)建一個QueueMessage對象

延時消息

延時消息的作用:

  1. 定時任務:
    延時消息可以用來實現(xiàn)定時任務。例如,你可能想在未來的某個時間點執(zhí)行特定操作,如發(fā)送提醒、更新狀態(tài)等。

  2. 延遲處理:
    在某些情況下,立即處理消息并不理想或可能。延時消息允許應用程序延遲處理,直到最合適的時機。

  3. 限流:
    延時消息可以幫助對系統(tǒng)內(nèi)部的請求進行限流,防止在短時間內(nèi)因大量請求而過載。

  4. 解耦和異步處理:
    在復雜的系統(tǒng)中,延時消息可以用來解耦不同組件間的直接交互,提高系統(tǒng)的可擴展性和維護性。

通過 scheduleDelayedMessage 方法快速投遞一條延時消息。

    /**
     * @param mixed|QueueMessageInterface $data
     * @param int $delay
     * @param string $identifier
     * @return bool
     * @throws ScheduleDelayedMessageException
     */
    public function scheduleDelayedMessage(string|array|int|QueueMessageInterface $data, int $delay = 0, string $identifier = ''): bool;

// 消息內(nèi)容
$message = [
    'type' => 'warning',
    'to' => 'xxxx@email.com',
    'content' => '.....'
];

// 投遞一條延時消息 60秒后處理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message, 60);

// QueueMessage對象

$message = app\queue\test\SendEmail::createQueueMessage($message);

// 設置延時
$message->setDelay(60);

// 投遞一條延時消息 60秒后處理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message);

// 使用第二個參數(shù)會替換之前對象的延時設置
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message,80);

如果我們想避免消息被重復發(fā)送等情況,通過延時隊列的特性可以很簡單實現(xiàn)。通過scheduleDelayedMessage
方法的第三個參數(shù)identifier傳遞一個自定義的延時消息ID,同樣的消息ID,消息將會被替換,延時時間從修改開始重新計算。

如果消息已經(jīng)進入stream隊列將無法實現(xiàn)替換,必須在延時時間內(nèi),類似實現(xiàn)一個“防抖”效果,消息在時間段內(nèi)發(fā)送多次最終只處理一次。


// 消息內(nèi)容
$message = [
    'type' => 'warning',
    'to' => 'xxxx@email.com',
    'content' => '.....'
];

// 通過type,to參數(shù)生成一個唯一ID
$identifier = md5(serialize([
    'type' => 'warning',
    'to' => 'xxxx@email.com',
]));

// 投遞一條延時消息 60秒后處理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message, 60, $identifier);

// QueueMessage對象

$message = app\queue\test\SendEmail::createQueueMessage($message);

// 設置延時
$message->setDelay(60);

// 設置identifier
$message->setIdentifier($identifier);

// 投遞一條延時消息 60秒后處理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message);

// 傳遞參數(shù)會替換對象之前的延時和ID設置
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message, 80, $identifier);

當一次需要投遞大量延時消息時,可以通過scheduleDelayedMessages方法發(fā)送。


// 投遞10w條延時消息
$dataArr = array_fill(0, 100000, null);

for ($i = 0; $i < 100000; $i++) {
    $dataArr[$i] = [
        'delay' => 2, // 延時時間
        'data' => ['dummy' => uniqid()], // 隊列數(shù)據(jù)
        'identifier' => '' // 自定義ID
    ];
}

// 批量投遞
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessages($dataArr);

// QueueMessage對象

for ($i = 0; $i < 100000; $i++) {

    $message = app\queue\test\SendEmail::createQueueMessage(['dummy' => uniqid()]);

    // 設置延時
    $message->setDelay(60);

    // 設置identifier
    $message->setIdentifier('');

    $dataArr[$i] = $message;
}

// 批量投遞
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessages($dataArr);

多redis只需要在隊列配置connection連接標識,投遞方式?jīng)]有任何變化。

移除延時隊列消息

新功能 (v1.0.1)

以下功能在插件的 v1.0.1 版本中新增。

有時候我們想移除某個或多個延時隊列時,可以使用removeDelayedMessageremoveDelayedMessages
方法實現(xiàn),使用hasDelayedMessageExistshasDelayedMessagesExist判斷一條或多條延時消息是否存在。

只有任務還存在延時隊列中才能移除,如果已經(jīng)進入Stream隊列中將無法移除。

    /**
     * @param string $identifier
     * @return bool
     */
    public function removeDelayedMessage(string $identifier): bool;

    /**
     * @param array $identifiers
     * @return bool|array
     */
    public function removeDelayedMessages(array $identifiers): array|bool;

    /**
     * @param string $identifier
     * @return bool
     */
    public function hasDelayedMessageExists(string $identifier): bool;

    /**
     * @param array $identifiers
     * @return bool|array
     */
    public function hasDelayedMessagesExist(array $identifiers): array|bool;

代碼示例:

$consumer = new app\queue\test\SendEmail();

$queueProducer = $consumer::createQueueProducer();

// 添加一條延時消息 通過業(yè)務數(shù)據(jù)生成消息ID
$queueProducer->scheduleDelayedMessage(['dummy' => 'ok'], 60, 'email_user_id');

// 通過QueueMessage對象
$queueMessage = $consumer::createQueueMessage(['dummy' => 'ok']);

$queueMessage->setDelay(60);

// 自定義消息ID 不設置將默認生成 通過getIdentifier()獲取
$queueMessage->setIdentifier('test_id');

// 獲取消息ID
$id = $queueMessage->getIdentifier();

// 判斷消息是否存在
var_export(SendEmail::createQueueProducer()->hasDelayedMessageExists('identifier'));  // true or false

// 移除一條延時消息
var_export(SendEmail::createQueueProducer()->removeDelayedMessage('identifier')); // true or false

// 判斷多條消息是否存在 返回一個數(shù)組
var_export(SendEmail::createQueueProducer()->hasDelayedMessagesExist(['identifier1', 'identifier1', 'identifier1']));

//.array (
//    0 => 1706383223.0,
//    1 => 1706383223.0,
//    2 => false
//).

// 移除多條延時消息 返回一個數(shù)組
var_export(SendEmail::createQueueProducer()->removeDelayedMessages(['identifier1', 'identifier1', 'identifier1']));

//
//.array (
//    0 => 1,
//    1 => 1,
//    2 => 0
//).  

消費消息

消費消息時會調(diào)用消費類的consume方法,并傳遞一個實現(xiàn)ConsumerMessageInterface接口對象。

<?php

namespace app\queue\test;

use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;

class SendEmail extends Consumer
{
    // 連接標識,對應redis.php的配置 默認default
    protected string $connection = 'default';

    public function consume(ConsumerMessageInterface $consumerMessage)
    {
        // TODO: Implement consume() method.

        // 獲取消息ID
        $messageId = $consumerMessage->getMessageId();

        // 獲取隊列數(shù)據(jù)
        $data = $consumerMessage->getData();

        // 禁用錯誤重試 如果消費失敗將不會異常重試
        $consumerMessage->disableFailRetry();

        // 手動觸發(fā)錯誤重試,此方法會調(diào)用disableFailRetry方法,所以后續(xù)報錯不會再觸發(fā)異常重試。
        // 沒有禁用錯誤重試的情況下,消費異常默認會調(diào)用此方法。
        $consumerMessage->triggerError(new \Exception('triggerError'));

        // 監(jiān)聽消費異常事件
        $consumerMessage->onError(function (\Throwable $e, ConsumerMessageInterface $consumerMessage) {
            // 這里可以處理消費異常邏輯

            // 禁用錯誤重試
            $consumerMessage->disableFailRetry();

            // 添加日志等等
            // 如果在消費方法中自行捕獲 Throwable 此事件不會觸發(fā)
        });

        // 業(yè)務邏輯執(zhí)行完畢,ack確認消息 默認自動ack,但通常建議在業(yè)務邏輯中顯式調(diào)用,比如ack失敗進行事務回滾等等。
        $isAcked = $consumerMessage->ack();

        if (!$isAcked) {

        }

        // 或通過getAckStatus方法獲取結(jié)果
        if (!$consumerMessage->getAckStatus()) {

        }

        // 獲取原始隊列消息 QueueMessage對象
        $queueMessage = $consumerMessage->getQueueMessage();

        // 獲取消息錯誤次數(shù)...
        $failCount = $queueMessage->getFailCount();
        // 更多...
    }
}

上方示例主要演示可調(diào)用的方法,下面使用一個更加貼合實際的demo,更快了解消費業(yè)務邏輯的編寫。

發(fā)送郵件驗證碼

場景特點:獲取驗證碼的操作一般由用戶手動觸發(fā),在這類場景中,錯誤重試應用戶在前端UI倒計時結(jié)束后重新手動發(fā)起,如果業(yè)務出現(xiàn)崩潰,再次上線后重新發(fā)送驗證碼給用戶已經(jīng)沒有意義了。我們可以通過配置適應這類場景,代碼示例:

<?php

namespace app\queue\test;

use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;

class SendEmail extends Consumer
{
    // 連接標識,對應redis.php的配置 默認default
    protected string $connection = 'default';

    // 將pending處理策略調(diào)整為PENDING_PROCESSING_IGNORE 消息掛起超時將不會進行重試
    protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_IGNORE;

    public function consume(ConsumerMessageInterface $consumerMessage)
    {
        // TODO: Implement consume() method.

        // 獲取消息ID
        $messageId = $consumerMessage->getMessageId();

        // 獲取隊列數(shù)據(jù)
        $data = $consumerMessage->getData();

        // 監(jiān)聽異常
        $consumerMessage->onError(function (\Throwable $e){

            // 記錄郵件發(fā)送失敗日志
        });

        // 禁用重試
        $consumerMessage->disableFailRetry();

        // 發(fā)送一封郵件 ....

        // 確認消息
        $consumerMessage->ack();
    }
}

自定義錯誤重試

消費類繼承的抽象類Consumer默認實現(xiàn)了handlerFailRetry
方法,在觸發(fā)異常重試時,會調(diào)用此方法,如果您想自定義錯誤重試邏輯,或加入更多自定義的處理,在本插件中可以輕松實現(xiàn),并且每個隊列都支持自定義配置。

/**
     * 處理錯誤重試
     * @param $messageId
     * @param ConsumerMessageInterface $consumerMessage
     * @param Throwable $e
     * @return bool
     * @throws ScheduleDelayedMessageException
     * @throws RedisException
     * @throws Throwable
     */
    public function handlerFailRetry($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): bool
    {
        $queueMessage = $consumerMessage->getQueueMessage();

        // 檢查是否超過最大重試次數(shù)
        if ($queueMessage->getFailCount() >= $this->maxAttempts) {
            // 死信處理
            $this->handlerDeadLetterQueue($messageId, $consumerMessage, $e);
            return true;
        }

        $queueMessage->incrementFailCount(); // Fail count + 1

        // 計算下次重試時間
        $retrySeconds = $queueMessage->getFailCount() * $this->retrySeconds;

        // 更新下次重試時間
        $queueMessage->updateNextRetry($retrySeconds);

        // 設置消息延時
        $queueMessage->setDelay($retrySeconds);

        // 設置消息ID 避免重復任務
        $queueMessage->setIdentifier($messageId);

        // 重新發(fā)布至延時隊列
        return self::createQueueProducer()->scheduleDelayedMessage($queueMessage);
    }

默認實現(xiàn)的代碼如上,我們只需要重寫此方法就可以自定義錯誤處理的業(yè)務邏輯。

代碼示例:

<?php

namespace app\queue\test;

use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;
use Throwable;

class SendEmail extends Consumer
{
    // 連接標識,對應redis.php的配置 默認default
    protected string $connection = 'default';

    // 將pending處理策略調(diào)整為PENDING_PROCESSING_IGNORE 消息掛起超時將不會進行重試
    protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_IGNORE;

    public function consume(ConsumerMessageInterface $consumerMessage)
    {
        // TODO: Implement consume() method.

        // 獲取消息ID
        $messageId = $consumerMessage->getMessageId();

        // 獲取隊列數(shù)據(jù)
        $data = $consumerMessage->getData();

        // 監(jiān)聽異常
        $consumerMessage->onError(function (\Throwable $e){

            // 記錄郵件發(fā)送失敗日志
        });

        // 禁用重試
        $consumerMessage->disableFailRetry();

        // 發(fā)送一封郵件 ....

        // 確認消息
        $consumerMessage->ack();
    }

    public function handlerFailRetry($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): bool
    {
        // 不改動原本的錯誤處理 也可以完全自定義實現(xiàn)。
        parent::handlerFailRetry($messageId, $consumerMessage, $e);

       // 如果隊列在業(yè)務數(shù)據(jù)庫中還有一個tasks表進行調(diào)度,在這里可以更新task數(shù)據(jù) 比如 錯誤次數(shù)+1
    }
}

自定義死信處理

handlerFailRetry方法中,默認有這一段:


// 檢查是否超過最大重試次數(shù)
if ($queueMessage->getFailCount() >= $this->maxAttempts) {
    // 死信處理
    $this->handlerDeadLetterQueue($messageId, $consumerMessage, $e);
    return true;
}

那么,我們?nèi)绻枰远x死信處理或加入額外的業(yè)務邏輯可以通過重寫handlerDeadLetterQueue方法實現(xiàn)。

protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_IGNORE;
當我們設置pending處理策略為PENDING_PROCESSING_IGNORE
時,消息如果掛起超時,將不會觸發(fā)異常重試,而是直接調(diào)用死信處理。默認情況下,死信處理會新增一條日志,方便排查問題。

默認情況下需要配置有效的日志(log.php) 默認行為才有效。也可以通過重寫方法完全自行實現(xiàn),記錄在業(yè)務的數(shù)據(jù)庫中,這也是推薦的做法,可以針對業(yè)務實現(xiàn)更加靈活的異常處理。

    /**
     * 處理死信 超過最大重試次數(shù)或pending超時PENDING_PROCESSING_IGNORE策略 會調(diào)用此方法
     * @param $messageId
     * @param ConsumerMessageInterface $consumerMessage
     * @param Throwable $e
     * @return void
     */
    public function handlerDeadLetterQueue($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): void
    {
        $queueMessage = $consumerMessage->getQueueMessage();

        // 添加日志
        LogUtility::warning('dead_letter_queue: ', [
            'messageId' => $messageId,
            'message' => $queueMessage->toArray(),
            'failCount' => $queueMessage->getFailCount(),
            'errorMsg' => $e->getMessage(),
            'trace' => $e->getTraceAsString()
        ]);

        // 更多...
    }

代碼示例:

public function handlerDeadLetterQueue($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): void
{
    // 保持默認行為
    parent::handlerDeadLetterQueue($messageId, $consumerMessage, $e); // TODO: Change the autogenerated stub

    // 如果隊列在業(yè)務數(shù)據(jù)庫中還有一個tasks表進行調(diào)度,在這里可以更新task數(shù)據(jù)
}

自定義pending超時處理

抽象類Consumer中,默認定義了handlerPendingTimeoutMessages方法,用于處理pending超時的消息。

消費者讀取了一條消息后,消息會進入pending
列表,不會被當前和其他消費者再次讀取,當業(yè)務邏輯沒有執(zhí)行完畢,服務出現(xiàn)掉線,崩潰時,消息并沒有ack,消息會一直保存在pending
列表中,pending列表只能通過ack移除,如果長期不處理,可能造成pending
列表堆積,造成大量內(nèi)存占用,當持續(xù)時間大于$pendingTimout屬性的時間(默認300秒),會調(diào)用此方法進行處理。

默認情況下,在PENDING_PROCESSING_IGNORE策略中,我們認為pending超時消息是死信,不會再次處理,PENDING_PROCESSING_RETRY
會進行異常重試。


     /**
     * 處理消息掛起超時 當pending列表中有超時未ack的消息會觸發(fā)此方法
     * @param string $messageId
     * @param ConsumerMessageInterface $consumerMessage
     * @param string $consumerName
     * @param int $elapsedTime
     * @param int $deliveryCount
     * @return void
     * @throws RedisException
     * @throws ScheduleDelayedMessageException
     * @throws Throwable
     */
    public function handlerPendingTimeoutMessages(string $messageId, ConsumerMessageInterface $consumerMessage, string $consumerName, int $elapsedTime, int $deliveryCount): void
    {
        switch ($this->getPendingProcessingStrategy()) {
            case self::PENDING_PROCESSING_IGNORE: // 忽略pending超時

                // 確認消息
                $consumerMessage->ack();

                // 觸發(fā)死信處理
                $this->handlerDeadLetterQueue($messageId, $consumerMessage, new Exception(
                    'PENDING_PROCESSING_IGNORE: Message pending timeout.'
                ));
                break;
            case self::PENDING_PROCESSING_RETRY: // pending超時重試

                // 觸發(fā)死信處理
                if ($deliveryCount + 1 > $this->getMaxAttempts()) {

                    // ack消息
                    $consumerMessage->ack();

                    $this->handlerDeadLetterQueue(
                        $messageId,
                        $consumerMessage,
                        new Exception(
                            'PENDING_PROCESSING_RETRY: The number of message delivery times exceeds the maximum number of retries.'
                        ));

                    return;
                }

                // 處理重試
                $handlerStatus = $this->handlerFailRetry(
                    $messageId,
                    $consumerMessage,
                    new Exception('PENDING_PROCESSING_RETRY: Message pending timeout retry.')
                );

                if ($handlerStatus) {
                    $consumerMessage->ack();
                }
                break;
        }
    }

獲取隊列的redis連接

有時候我們需要操作或維護隊列時,可以直接獲取隊列的Redis連接進行操作,比如編寫自定義腳本等。


// 獲取隊列的Redis連接
$sendCode = new app\queue\test\SendCode();

$redisConnection = $sendCode->getRedisConnection();

// 使用方法和phpredis擴展一致

$redisConnection->xLen();

$redisConnection->sAdd();

// 在消費類中可以直接使用$this->getRedisConnection();

....更多

命令行

php webman solar:make:consumer

  • 創(chuàng)建一個消費者
  • 它將引導你創(chuàng)建一個基本的消費者類

php webman solar:remove:consumer

  • 移除一個消費者
  • 它將引導你移除消費者類
  • 注意:它會移除redis中關(guān)于此消費者的所有數(shù)據(jù),如果你只是想移除類和配置,請不要使用此命令。

php webman solar:clean:redis:data

  • 清理某個消費者的Redis數(shù)據(jù)
  • 它將引導你清理redis數(shù)據(jù)
  • 注意:它將刪除redis中關(guān)于此消費者的所有數(shù)據(jù),請謹慎操作。

php webman solar:consumer:list

  • 獲取當前全部消費者信息,包含如下信息:
  • Key 標識
  • Handler 進程類
  • Count 進程數(shù)
  • Consumer 消費者類名
  • Stream Length 當前隊列總長度(不包含Pending列表中的數(shù)量)
  • Delay Set Length 當前延時隊列任務數(shù)
  • Pending List Length 當前Pending列表長度
+-------------+--------------------------------------------------------+-------+--------------------------+---------------+------------------+---------------------+
| Key         | Handler                                                | Count | Consumer                 | Stream Length | Delay Set Length | Pending List Length |
+-------------+--------------------------------------------------------+-------+--------------------------+---------------+------------------+---------------------+
| SendCode    | SolarSeahorse\WebmanRedisQueue\Process\ConsumerProcess | 20    | app\queue\test\SendCode  | 1996          | 950              | >=500               |
| SendEmail   | SolarSeahorse\WebmanRedisQueue\Process\ConsumerProcess | 20    | app\queue\test\SendEmail | 0             | 0                | 0                   |
| SendSmsCode | SolarSeahorse\WebmanRedisQueue\Process\ConsumerProcess | 1     | app\queue\SendSmsCode    | 0             | 0                | 0                   |
+-------------+--------------------------------------------------------+-------+--------------------------+---------------+------------------+---------------------+

處理歷史消息

使用場景:

  1. 在極端情況下業(yè)務執(zhí)行完畢并且ack成功,但是刪除消息時出現(xiàn)異常,消息保留在stream中,一般少量數(shù)據(jù)時我們無需在意,但如果堆積數(shù)量過大可能造成內(nèi)存占用和性能問題。
  2. 當你需要處理歷史消息,或者重新處理之前已經(jīng)處理過的消息。
  3. 當你需要對 Stream 中的歷史數(shù)據(jù)進行分析或生成報告。

autoDel屬性為true
時,消息會自動刪除,無法對歷史數(shù)據(jù)進行處理和分析,如果業(yè)務需要對歷史隊列消息進行回溯請設置為false

代碼示例:

這里我們使用了webman自定義腳本
的編寫,可以將腳本加入定時任務中,定期清理或處理歷史消息。

下方代碼只是示例,請確保在測試環(huán)境充分測試。


<?php

use SolarSeahorse\WebmanRedisQueue\Queue\QueueMessage;

require_once __DIR__ . '/../vendor/autoload.php';
require_once __DIR__ . '/../support/bootstrap.php';

// 獲取隊列的Redis連接
$sendCode = new app\queue\test\SendCode();

$redisConnection = $sendCode->getRedisConnection();

// 使用方法和phpredis擴展一致
$streamKey = $sendCode->getStreamKey();
$start = '-'; // 表示從 Stream 的最開始讀取
$end = '+';   // 表示讀取到 Stream 的最末尾
$count = 100;  // 指定要讀取的消息數(shù)量

// 讀取Stream列表,不包括pending
$messages = $redisConnection->xRange($streamKey, $start, $end, $count);

$deleteMessageIds = [];

foreach ($messages as $messageId => $message) {

    // 解析原始消息內(nèi)容
    $messageArr = QueueMessage::parseRawMessage($message);

    if (!$messageArr) { // 未知消息
        $deleteMessageIds[] = $messageId;
        continue;
    }

    // 轉(zhuǎn)換為QueueMessage方便操作
    $queueMessage = QueueMessage::createFromArray($messageArr);

    // 通過獲取消息時間戳,如果消息已經(jīng)存在超過1個小時 標記刪除。
    if (time() - $queueMessage->getTimestamp() > 3600) {
        $deleteMessageIds[] = $messageId;
    }

}

// 批量刪除消息
$redisConnection->xDel($streamKey, $deleteMessageIds);

在其他項目投遞消息

目前插件沒有實現(xiàn)在其他項目投遞的標準實現(xiàn),可通過業(yè)務需求開發(fā)隊列提交接口實現(xiàn)。

贊助商