playcatqueue的升級版之playcat/queue-webman

1.2.2
版本
2024-12-19
版本更新時間
63
安裝
2
star
簡介
之所以又重新做了一版是因為之前的版本還有一些功能沒實現(xiàn),但我又想能支持webman和tp+swoole,索性重構(gòu)了代碼。
- 支持Redis單機或集群 (redis >= 5.0)
- 支持Kafka
- 支持RabbitMQ
- 自定義異常與重試流程
- (新)支持延遲消息數(shù)據(jù)持久化
安裝
composer require "playcat/queue-webman"
1.配置
1.1
編輯 config\plugin\playcat\queue\ 目錄下的app.php和process.php。修改相應內(nèi)容為自己環(huán)境使用的配置。
1.2 初始化數(shù)據(jù)庫(只需一次)
php webman timerserver:initdb
2.創(chuàng)建消費任務(wù)
新建一個php的文件并且添加以下內(nèi)容:
<?php
namespace app\queue\playcat;
use Playcat\Queue\Protocols\ConsumerDataInterface;
use Playcat\Queue\Protocols\ConsumerInterface;
class playcatConsumer1 implements ConsumerInterface
{
//任務(wù)名稱,對應發(fā)布消息的名稱
public $queue = 'playcatqueue';
public function consume(ConsumerData $payload)
{
//獲取發(fā)布到隊列時傳入的內(nèi)容
$data = $payload->getQueueData();
...你自己的業(yè)務(wù)邏輯
//休息10s
sleep(10);
echo('done!');
}
}
ConsumerData方法
- getID: 當前消息的id
- getRetryCount(): 當前任務(wù)已經(jīng)重試過的次數(shù)
- getQueueData(): 當前任務(wù)傳入的參數(shù)
- getChannel(): 當前所執(zhí)行的任務(wù)名稱
-
- -
將上面編寫好的任務(wù)文件保存項目中'app/queue/playcat/'下(如果目錄不存在則自己手動創(chuàng)建)
啟動服務(wù):
啟動:
php start.php start
重載:可在不重啟服務(wù)的情況下更新業(yè)務(wù)
php start.php reload
停止:
php start.php stop
如果沒有錯誤出現(xiàn)則表示啟動完成
添加任務(wù)并且提交到隊列中
use Playcat\Queue\Manager;
use Playcat\Queue\Protocols\ProducerData;
//使用協(xié)程的方式,如果需要并行數(shù)據(jù)發(fā)布需要自行實現(xiàn)Manager的連接池
//即時消費消息
$payload = new ProducerData();
//對應消費隊列里的任務(wù)名稱
$payload->setChannel('test');
//對應消費隊列里的任務(wù)使用的數(shù)據(jù)
$payload->setQueueData([1,2,3,4]);
//推入隊列并且獲取消息id
$id = Manager::getInstance()->push($payload);
//延遲消費消息
$payload_delay = new ProducerData();
$payload_delay->setChannel('test');
$payload_delay->setQueueData([6,7,8,9]);
//設(shè)置60秒后執(zhí)行的任務(wù)
$payload_delay->setDelayTime(60);
//推入隊列并且獲取消息id
$id = Manager::getInstance()->push($payload_delay);
//取消延遲消息
Manager::getInstance()->del($id);
ProducerData方法
- setChannel: 設(shè)置推入消息的隊列名稱
- setQueueData: 設(shè)置傳入到消費任務(wù)的數(shù)據(jù)
- setDelayTime: 設(shè)置延遲時間(秒)
-
- -
異常與重試機制
任務(wù)在執(zhí)行過程中未拋出異常則默認執(zhí)行成功,否則則進入重試階段.
重試次數(shù)和時間由配置控制,重試間隔時間為當前重試次數(shù)的冪函數(shù)。
Playcat\Queue\Exceptions\DontRetry異常會忽略掉重試
其它
基于tp和swoole的隊列系統(tǒng)
playcat-queue-tpswoole
QQ:318274085