【workbunny】RabbitMQ客戶端

版本計(jì)劃
?? 已發(fā)布2.3.0版本
- 增加--mode=co-queue創(chuàng)建CoQueueBuilder
- 協(xié)程化客戶端,減少客戶端復(fù)雜度,增加易用性
- 與QueueBuilder使用一致,僅協(xié)程化區(qū)別
- 支持workerman v5
常見問題
-
什么時(shí)候使用消息隊(duì)列?
當(dāng)你需要對(duì)系統(tǒng)進(jìn)行解耦、削峰、異步的時(shí)候;如發(fā)送短信驗(yàn)證碼、秒殺活動(dòng)、資產(chǎn)的異步分賬清算等。
-
RabbitMQ和Redis的區(qū)別?
Redis中的Stream的特性同樣適用于消息隊(duì)列,并且也包含了比較完善的ACK機(jī)制,但在一些點(diǎn)上與RabbitMQ存在不同:
- Redis Stream沒有完善的后臺(tái)管理;RabbitMQ擁有較為完善的后臺(tái)管理及Api;
- Redis的持久化策略取舍:默認(rèn)的RDB策略極端情況下存在丟失數(shù)據(jù),AOF策略則需要犧牲一些性能;RabbitMQ持久化方案更多,可對(duì)消息持久化也可對(duì)隊(duì)列持久化;
- RabbitMQ擁有更多的插件可以提供更完善的協(xié)議支持及功能支持;
-
什么時(shí)候使用Redis?什么時(shí)候使用RabbitMQ?
當(dāng)你的隊(duì)列使用比較單一或者比較輕量的時(shí)候,請(qǐng)選用 Redis Stream;當(dāng)你需要一個(gè)比較完整的消息隊(duì)列體系,包括需要利用交換機(jī)來綁定不同隊(duì)列做一些比較復(fù)雜的消息任務(wù)的時(shí)候,請(qǐng)選擇RabbitMQ;
當(dāng)然,如果你的隊(duì)列使用也比較單一,但你需要用到一些管理后臺(tái)相關(guān)系統(tǒng)化的功能的時(shí)候,又不想花費(fèi)太多時(shí)間去開發(fā)的時(shí)候,也可以使用RabbitMQ;因?yàn)镽abbitMQ提供了一整套后臺(tái)管理的體系及 HTTP API 供開發(fā)者兼容到自己的管理后臺(tái)中,不需要再消耗多余的時(shí)間去開發(fā)功能;
注:這里的 輕量 指的是 無須將應(yīng)用中的隊(duì)列服務(wù)獨(dú)立化,該隊(duì)列服務(wù)是該應(yīng)用獨(dú)享的
說明
- 此文檔為2.0,1.0文檔
- 1.0已停止維護(hù)
簡介
RabbitMQ的webman客戶端插件;
- 支持5種消費(fèi)模式:簡單隊(duì)列、workQueue、routing、pub/sub、exchange;
- 支持延遲隊(duì)列(rabbitMQ須安裝插件);
- 異步無阻塞消費(fèi)、異步無阻塞生產(chǎn)、同步阻塞生產(chǎn);
概念
1. Builder
- Builder為隊(duì)列的抽象結(jié)構(gòu),每個(gè)Builder都包含一個(gè)BuilderConfig配置結(jié)構(gòu)對(duì)象
- 當(dāng)前進(jìn)程的所有Builder公用一個(gè)對(duì)象池,對(duì)象池可用于減少連接的創(chuàng)建和銷毀,提升性能
- 當(dāng)前進(jìn)程的所有Builder公用一個(gè)Connection連接對(duì)象池:
- 當(dāng)reuse_connection=false時(shí),Builder之間使用各自的Connection連接對(duì)象
- 當(dāng)reuse_connection=true時(shí),不同Builder復(fù)用同一個(gè)Connection連接對(duì)象
2. Connection
- Connection是抽象的連接對(duì)象,每個(gè)Connection會(huì)創(chuàng)建兩個(gè)TCP連接:
getAsyncClient()
獲取AsyncClient 異步客戶端連接getSyncClient()
獲取SyncClient 同步客戶端連接
- 一個(gè)Connection對(duì)象在RabbitMQ-server中等于兩個(gè)連接
- 所有Builder的Connection連接對(duì)象在Builder的Connection池中進(jìn)行統(tǒng)一管理
- 當(dāng)reuse_connection=true時(shí),Connection對(duì)象在池中的key為空字符串
3. Channel
- Channel是基于RabbitMQ-server的連接對(duì)象的子連接
- Channel的上限默認(rèn)根據(jù)RabbitMQ-server的channel limit配置
- Channel的生命周期與Connection一致,在Connection存續(xù)期間,Channel不會(huì)被銷毀
- Channel池可以啟用/關(guān)閉復(fù)用模式中:
- 當(dāng)reuse_channel=true時(shí),連接會(huì)使用Channel池中閑置通道進(jìn)行發(fā)送,如當(dāng)前不存在閑置通道,則創(chuàng)建新的通道;
- 當(dāng)reuse_channel=false時(shí),連接每次都會(huì)創(chuàng)建新的通道,建議在生產(chǎn)完成后調(diào)用連接關(guān)閉
- AsyncClient和SyncClient互相不共用TCP連接,所以Channel池也不公用
- 可以通過
getChannels
方法獲取Channel對(duì)象池,自行管理釋放
使用
要求
- php >= 8.0
- webman-framework >= 1.5
- rabbitmq-server >= 3.10
安裝
composer require workbunny/webman-rabbitmq
配置
<?php
return [
'enable' => true,
'host' => 'rabbitmq',
'vhost' => '/',
'port' => 5672,
'username' => 'guest',
'password' => 'guest',
'mechanisms' => 'AMQPLAIN',
'timeout' => 10,
// 重啟間隔
'restart_interval' => 0,
// 心跳間隔
'heartbeat' => 50,
// 心跳回調(diào)
'heartbeat_callback' => function(){
},
// 錯(cuò)誤回調(diào)
'error_callback' => function(Throwable $throwable){
},
// 復(fù)用連接
'reuse_connection' => false,
// 復(fù)用通道
'reuse_channel' => false,
// AMQPS 如需使用AMQPS請(qǐng)取消注釋
// 'ssl' => [
// 'cafile' => 'ca.pem',
// 'local_cert' => 'client.cert',
// 'local_pk' => 'client.key',
// ],
];
QueueBuilder
- 可實(shí)現(xiàn)官網(wǎng)的5種消費(fèi)模式
命令行
- 創(chuàng)建
# 創(chuàng)建一個(gè)擁有單進(jìn)程消費(fèi)者的QueueBuilder
./webman workbunny:rabbitmq-builder test --mode=queue
# 創(chuàng)建一個(gè)擁有4進(jìn)程消費(fèi)者的QueueBuilder
./webman workbunny:rabbitmq-builder test 4 --mode=queue
# 創(chuàng)建一個(gè)擁有單進(jìn)程消費(fèi)者的延遲QueueBuilder
./webman workbunny:rabbitmq-builder test --delayed --mode=queue
# 創(chuàng)建一個(gè)擁有4進(jìn)程消費(fèi)者的延遲QueueBuilder
./webman workbunny:rabbitmq-builder test 4 --delayed --mode=queue
# 在 process/workbunny/rabbitmq 目錄下創(chuàng)建 TestBuilder.php
./webman workbunny:rabbitmq-builder test --mode=queue
# 在 process/workbunny/rabbitmq/project 目錄下創(chuàng)建 TestBuilder.php
./webman workbunny:rabbitmq-builder project/test --mode=queue
# 在 process/workbunny/rabbitmq/project 目錄下創(chuàng)建 TestAllBuilder.php
./webman workbunny:rabbitmq-builder project/testAll --mode=queue
# 延遲同理
- 移除
移除包含了類文件的移除和配置的移除
# 移除Builder
./webman workbunny:rabbitmq-remove test
# 移除延遲Builder
./webman workbunny:rabbitmq-remove test --delayed
# 二級(jí)菜單同理
- 關(guān)閉
關(guān)閉僅對(duì)配置進(jìn)行移除
# 關(guān)閉Builder
./webman workbunny:rabbitmq-remove test --close
# 關(guān)閉延遲Builder
./webman workbunny:rabbitmq-remove test --close --delayed
# 二級(jí)菜單同理
注意
- 創(chuàng)建的Builder類可以手動(dòng)修改調(diào)整
- 為Builder添加進(jìn)process.php的配置可以手動(dòng)修改
- 延遲隊(duì)列需要為 rabbitMQ 安裝 rabbitmq_delayed_message_exchange 插件
- 進(jìn)入 rabbitMQ 的 plugins 目錄下執(zhí)行命令下載插件(以rabbitMQ 3.8.x舉例):
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.17/rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez
- 執(zhí)行安裝命令
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 進(jìn)入 rabbitMQ 的 plugins 目錄下執(zhí)行命令下載插件(以rabbitMQ 3.8.x舉例):
查看Builder
./webman workbunny:rabbitmq-list
注:當(dāng) Builder 未啟動(dòng)時(shí),handler 與 count 顯示為 --
+----------+-------------------------------------------------------------------------+-------------------------------------------------+-------+-------+
| name | file | handler | count | mode |
+----------+-------------------------------------------------------------------------+-------------------------------------------------+-------+-------+
| test | /var/www/your-project/process/workbunny/rabbitmq/TestBuilder.php | process\workbunny\rabbitmq\TestBuilder | 1 | queue |
| test -d | /var/www/your-project/process/workbunny/rabbitmq/TestBuilderDelayed.php | process\workbunny\rabbitmq\TestBuilderDelayed | 1 | group |
+----------+-------------------------------------------------------------------------+-------------------------------------------------+-------+-------+
生產(chǎn)
- 每個(gè)builder各包含一個(gè)連接,使用多個(gè)builder會(huì)創(chuàng)建多個(gè)連接
- 生產(chǎn)消息默認(rèn)不關(guān)閉當(dāng)前連接
- 異步生產(chǎn)的連接與消費(fèi)者共用
1. 同步發(fā)布消息
該方法會(huì)阻塞等待至消息生產(chǎn)成功,返回bool
- 發(fā)布普通消息
注:向延遲隊(duì)列發(fā)布普通消息會(huì)拋出一個(gè) WebmanRabbitMQException 異常
use function Workbunny\WebmanRabbitMQ\sync_publish;
use process\workbunny\rabbitmq\TestBuilder;
sync_publish(TestBuilder::instance(), 'abc'); # return bool
- 發(fā)布延遲消息
注:向普通隊(duì)列發(fā)布延遲消息會(huì)拋出一個(gè) WebmanRabbitMQException 異常
use function Workbunny\WebmanRabbitMQ\sync_publish;
use process\workbunny\rabbitmq\TestBuilder;
sync_publish(TestBuilder::instance(), 'abc', headers: [
'x-delay' => 10000, # 延遲10秒
]); # return bool
2. 異步發(fā)布消息
該方法不會(huì)阻塞等待,立即返回 React\Promise,
可以利用 React\Promise 進(jìn)行 wait;
也可以純異步不等待,React\Promise 項(xiàng)目地址;
- 發(fā)布普通消息
注:向延遲隊(duì)列發(fā)布普通消息會(huì)拋出一個(gè) WebmanRabbitMQException 異常
use function Workbunny\WebmanRabbitMQ\async_publish;
use process\workbunny\rabbitmq\TestBuilder;
async_publish(TestBuilder::instance(), 'abc'); # return PromiseInterface|bool
- 發(fā)布延遲消息
注:向普通隊(duì)列發(fā)布延遲消息會(huì)拋出一個(gè) WebmanRabbitMQException 異常
use function Workbunny\WebmanRabbitMQ\async_publish;
use process\workbunny\rabbitmq\TestBuilder;
async_publish(TestBuilder::instance(), 'abc', headers: [
'x-delay' => 10000, # 延遲10秒
]); # return PromiseInterface|bool
進(jìn)階
1. 自定義Builder
- 創(chuàng)建自定義Builder需繼承實(shí)現(xiàn)AbstractBuilder;
- onWorkerStart 消費(fèi)進(jìn)程啟動(dòng)時(shí)會(huì)觸發(fā),一般用于實(shí)現(xiàn)基礎(chǔ)消費(fèi)邏輯;
- onWorkerStop 消費(fèi)進(jìn)程結(jié)束時(shí)會(huì)觸發(fā),一般用于回收資源;
- onWorkerReload 消費(fèi)進(jìn)程重載,一般可置空;
- classContent 用于配合命令行自動(dòng)生成BuilderClass;
/**
* Builder 啟動(dòng)時(shí)
*
* @param Worker $worker
* @return void
*/
abstract public function onWorkerStart(Worker $worker): void;
/**
* Builder 停止時(shí)
*
* @param Worker $worker
* @return void
*/
abstract public function onWorkerStop(Worker $worker): void;
/**
* Builder 重加載時(shí)
*
* @param Worker $worker
* @return void
*/
abstract public function onWorkerReload(Worker $worker): void;
/**
* Command 獲取需要?jiǎng)?chuàng)建的類文件內(nèi)容
*
* @param string $namespace
* @param string $className
* @param bool $isDelay
* @return string
*/
abstract public static function classContent(string $namespace, string $className, bool $isDelay): string;
說明
- 生產(chǎn)可用,歡迎 issue 和 PR;
- Message 可以理解為隊(duì)列、交換機(jī)的配置信息;
- 繼承實(shí)現(xiàn) AbstractMessage 可以自定義Message;
- Builder 可通過 Builder->setMessage() 可設(shè)置自定義配置;
- 可使用 SyncClient 或 AsyncClient 自行實(shí)現(xiàn)一些自定義消費(fèi)/自定義生產(chǎn)的功能;