redis隊列插件

簡介
基于Redis的消息隊列,支持消息延遲處理。
安裝
composer require webman/redis-queue
配置文件
redis配置文件自動生成在 config/plugin/webman/redis-queue/redis.php
,內(nèi)容類似如下:
<?php
return [
'default' => [
'host' => 'redis://127.0.0.1:6379',
'options' => [
'auth' => '', // 密碼,可選參數(shù)
'db' => 0, // 數(shù)據(jù)庫
'max_attempts' => 5, // 消費失敗后,重試次數(shù)
'retry_seconds' => 5, // 重試間隔,單位秒
]
],
];
消費失敗重試
如果消費失敗(發(fā)生了異常),則消息會放入延遲隊列,等待下次重試。重試次數(shù)通過參數(shù) max_attempts
控制,重試間隔由
retry_seconds
和 max_attempts
共同控制。比如max_attempts
為5,retry_seconds
為10,第1次重試間隔為1*10
秒,第2次重試時間間隔為 2*10秒
,第3次重試時間間隔為3*10秒
,以此類推直到重試5次。如果超過了max_attempts
設(shè)置測重試次數(shù),則消息放入key為{redis-queue}-failed
的失敗隊列。
投遞消息(同步)
注意
需要webman/redis >= 1.2.0,依賴 redis擴(kuò)展
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Redis;
class Index
{
public function queue(Request $request)
{
// 隊列名
$queue = 'send-mail';
// 數(shù)據(jù),可以直接傳數(shù)組,無需序列化
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// 投遞消息
Redis::send($queue, $data);
// 投遞延遲消息,消息會在60秒后處理
Redis::send($queue, $data, 60);
return response('redis queue test');
}
}
投遞成功Redis::send()
返回true,否則返回false或者拋出異常。
提示
延遲隊列消費時間可能會出現(xiàn)誤差,例如消費速度小于生產(chǎn)速度導(dǎo)致隊列積壓,進(jìn)而導(dǎo)致消費延遲,緩解辦法是多開一些消費進(jìn)程。
投遞消息(異步)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Client;
class Index
{
public function queue(Request $request)
{
// 隊列名
$queue = 'send-mail';
// 數(shù)據(jù),可以直接傳數(shù)組,無需序列化
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// 投遞消息
Client::send($queue, $data);
// 投遞延遲消息,消息會在60秒后處理
Client::send($queue, $data, 60);
return response('redis queue test');
}
}
Client::send()
沒有返回值,它屬于異步推送,它不保證消息%100送達(dá)redis。
提示
Client::send()
原理是在本地內(nèi)存建立一個內(nèi)存隊列,異步將消息同步到redis(同步速度很快,每秒大概1萬筆消息)。如果進(jìn)程重啟,恰好本地內(nèi)存隊列里數(shù)據(jù)沒有同步完畢,會造成消息丟失。Client::send()
異步投遞適合投遞不重要的消息。提示
Client::send()
是異步的,它只能在workerman的運行環(huán)境中使用,命令行腳本請使用同步接口Redis::send()
在其他項目投遞消息
有時候你需要在其它項目中投遞消息并且無法使用webman\redis-queue
,則可以參考以下函數(shù)向隊列投遞消息。
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting';
$queue_delay = '{redis-queue}-delayed';
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
其中,參數(shù)$redis
為redis實例。例如redis擴(kuò)展用法類似如下:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);
消費
消費進(jìn)程配置文件在 config/plugin/webman/redis-queue/process.php
。
消費者目錄在 app/queue/redis/
下。
執(zhí)行命令php webman redis-queue:consumer my-send-mail
則會生成文件app/queue/redis/MyMailSend.php
提示
如果命令不存在也可以手動生成
<?php
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class MyMailSend implements Consumer
{
// 要消費的隊列名
public $queue = 'send-mail';
// 連接名,對應(yīng) plugin/webman/redis-queue/redis.php 里的連接`
public $connection = 'default';
// 消費
public function consume($data)
{
// 無需反序列化
var_export($data); // 輸出 ['to' => 'tom@gmail.com', 'content' => 'hello']
}
}
注意
消費過程中沒有拋出異常和Error視為消費成功,否則消費失敗,進(jìn)入重試隊列。
redis-queue沒有ack機(jī)制,你可以把它看作是自動ack(沒有產(chǎn)生異?;駿rror)。如果消費過程中想標(biāo)記當(dāng)前消息消費不成功,可以手動拋出異常,讓當(dāng)前消息進(jìn)入重試隊列。這實際上和ack機(jī)制沒有區(qū)別。提示
消費者支持多服務(wù)器多進(jìn)程,并且同一條消息不會被重復(fù)消費。消費過的消息會自動從隊列刪除,無需手動刪除。提示
消費進(jìn)程可以同時消費多種不同的隊列,新增隊列不需要修改process.php
中的配置,新增隊列消費者時只需要在app/queue/redis
下新增對應(yīng)的Consumer
類即可,并用類屬性$queue
指定要消費的隊列名提示
windows用戶需要執(zhí)行php windows.php 啟動webman,否則不會啟動消費進(jìn)程
自定義失敗處理
當(dāng)開發(fā)者想介入消費失敗后的處理流程時,可以實現(xiàn)onConsumeFailure
方法,例如
<?php
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class MyMailSend implements Consumer
{
public $queue = 'send-mail';
public $connection = 'default';
// 消費
public function consume($data)
{
// 省略
}
// 消費失敗時
public function onConsumeFailure(\Throwable $exception, $package)
{
var_export($package);
// 直接更改消息隊列數(shù)據(jù)結(jié)構(gòu),將最大重試次數(shù)max_attempts字段設(shè)置為0,即不再重試。
$package['max_attempts'] = 0;
// 除此之外還可更改data字段(也就是consume方法中的$data)
// 返回更改后的數(shù)據(jù)結(jié)構(gòu)
return $package;
}
}
為某些隊列單獨設(shè)置消費進(jìn)程
默認(rèn)情況下,所有的消費者共用相同的消費進(jìn)程。但有時我們需要將一些隊列的消費獨立出來,例如消費慢的業(yè)務(wù)放到一組進(jìn)程中消費,消費快的業(yè)務(wù)放到另外一組進(jìn)程消費。為此我們可以將消費者分為兩個目錄,例如 app_path() . '/queue/redis/fast'
和 app_path() . '/queue/redis/slow'
(注意消費類的命名空間需要做相應(yīng)的更改),則配置如下:
return [
...這里省略了其它配置...
'redis_consumer_fast' => [
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// 消費者類目錄
'consumer_dir' => app_path() . '/queue/redis/fast'
]
],
'redis_consumer_slow' => [
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// 消費者類目錄
'consumer_dir' => app_path() . '/queue/redis/slow'
]
]
];
這樣快業(yè)務(wù)消費者放到queue/redis/fast
目錄下,慢業(yè)務(wù)消費者放到queue/redis/slow
目錄下達(dá)到給隊列指定消費進(jìn)程的目的。
多redis配置
配置
config/plugin/webman/redis-queue/redis.php
<?php
return [
'default' => [
'host' => 'redis://192.168.0.1:6379',
'options' => [
'auth' => null, // 密碼,字符串類型,可選參數(shù)
'db' => 0, // 數(shù)據(jù)庫
'max_attempts' => 5, // 消費失敗后,重試次數(shù)
'retry_seconds' => 5, // 重試間隔,單位秒
]
],
'other' => [
'host' => 'redis://192.168.0.2:6379',
'options' => [
'auth' => null, // 密碼,字符串類型,可選參數(shù)
'db' => 0, // 數(shù)據(jù)庫
'max_attempts' => 5, // 消費失敗后,重試次數(shù)
'retry_seconds' => 5, // 重試間隔,單位秒
]
],
];
注意配置里增加了一個other
為key的redis配置
多redis投遞消息
// 向 `default` 為key的隊列投遞消息
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
// 等同于
Client::send($queue, $data);
Redis::send($queue, $data);
// 向 `other` 為key的隊列投遞消息
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);
多redis消費
消費配置里other
為key的隊列投遞消息
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class SendMail implements Consumer
{
// 要消費的隊列名
public $queue = 'send-mail';
// === 這里設(shè)置為other,代表消費配置里other為key的隊列 ===
public $connection = 'other';
// 消費
public function consume($data)
{
// 無需反序列化
var_export($data);
}
}
常見問題
為什么會有報錯 Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds)
這個錯誤只會存在于異步投遞接口Client::send()
中。異步投遞首先會將消息保存在本地內(nèi)存中,當(dāng)進(jìn)程空閑時將消息發(fā)送給redis。如果redis接收速度慢于消息生產(chǎn)速度,或者進(jìn)程一直忙于其它業(yè)務(wù)沒有足夠的時間將內(nèi)存的消息同步給redis,就會導(dǎo)致消息擠壓。如果有消息擠壓超過600秒,就會觸發(fā)此錯誤。
解決方案:投遞消息使用同步投遞接口Redis::send()
。