国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

【workbunny】RabbitMQ客戶端

2.4.1 版本
2025-04-28 版本更新時(shí)間
1617 安裝
26 star

Latest Stable Version Total Downloads License PHP Version Require

版本計(jì)劃

?? 已發(fā)布2.3.0版本

  1. 增加--mode=co-queue創(chuàng)建CoQueueBuilder
    1. 協(xié)程化客戶端,減少客戶端復(fù)雜度,增加易用性
    2. 與QueueBuilder使用一致,僅協(xié)程化區(qū)別
  2. 支持workerman v5

常見問題

  1. 什么時(shí)候使用消息隊(duì)列?

    當(dāng)你需要對(duì)系統(tǒng)進(jìn)行解耦、削峰、異步的時(shí)候;如發(fā)送短信驗(yàn)證碼、秒殺活動(dòng)、資產(chǎn)的異步分賬清算等。

  2. RabbitMQ和Redis的區(qū)別?

    Redis中的Stream的特性同樣適用于消息隊(duì)列,并且也包含了比較完善的ACK機(jī)制,但在一些點(diǎn)上與RabbitMQ存在不同:

    • Redis Stream沒有完善的后臺(tái)管理;RabbitMQ擁有較為完善的后臺(tái)管理及Api;
    • Redis的持久化策略取舍:默認(rèn)的RDB策略極端情況下存在丟失數(shù)據(jù),AOF策略則需要犧牲一些性能;RabbitMQ持久化方案更多,可對(duì)消息持久化也可對(duì)隊(duì)列持久化;
    • RabbitMQ擁有更多的插件可以提供更完善的協(xié)議支持及功能支持;
  3. 什么時(shí)候使用Redis?什么時(shí)候使用RabbitMQ?

    當(dāng)你的隊(duì)列使用比較單一或者比較輕量的時(shí)候,請(qǐng)選用 Redis Stream;當(dāng)你需要一個(gè)比較完整的消息隊(duì)列體系,包括需要利用交換機(jī)來綁定不同隊(duì)列做一些比較復(fù)雜的消息任務(wù)的時(shí)候,請(qǐng)選擇RabbitMQ;

    當(dāng)然,如果你的隊(duì)列使用也比較單一,但你需要用到一些管理后臺(tái)相關(guān)系統(tǒng)化的功能的時(shí)候,又不想花費(fèi)太多時(shí)間去開發(fā)的時(shí)候,也可以使用RabbitMQ;因?yàn)镽abbitMQ提供了一整套后臺(tái)管理的體系及 HTTP API 供開發(fā)者兼容到自己的管理后臺(tái)中,不需要再消耗多余的時(shí)間去開發(fā)功能;

    注:這里的 輕量 指的是 無須將應(yīng)用中的隊(duì)列服務(wù)獨(dú)立化,該隊(duì)列服務(wù)是該應(yīng)用獨(dú)享的

說明

  • 此文檔為2.0,1.0文檔
  • 1.0已停止維護(hù)

簡介

RabbitMQ的webman客戶端插件;

  1. 支持5種消費(fèi)模式:簡單隊(duì)列、workQueue、routing、pub/sub、exchange;
  2. 支持延遲隊(duì)列(rabbitMQ須安裝插件);
  3. 異步無阻塞消費(fèi)、異步無阻塞生產(chǎn)、同步阻塞生產(chǎn);

概念

1. Builder

  • Builder為隊(duì)列的抽象結(jié)構(gòu),每個(gè)Builder都包含一個(gè)BuilderConfig配置結(jié)構(gòu)對(duì)象
  • 當(dāng)前進(jìn)程的所有Builder公用一個(gè)對(duì)象池,對(duì)象池可用于減少連接的創(chuàng)建和銷毀,提升性能
  • 當(dāng)前進(jìn)程的所有Builder公用一個(gè)Connection連接對(duì)象池:
    • 當(dāng)reuse_connection=false時(shí),Builder之間使用各自的Connection連接對(duì)象
    • 當(dāng)reuse_connection=true時(shí),不同Builder復(fù)用同一個(gè)Connection連接對(duì)象

2. Connection

  • Connection是抽象的連接對(duì)象,每個(gè)Connection會(huì)創(chuàng)建兩個(gè)TCP連接:
    • getAsyncClient()獲取AsyncClient 異步客戶端連接
    • getSyncClient()獲取SyncClient 同步客戶端連接
  • 一個(gè)Connection對(duì)象在RabbitMQ-server中等于兩個(gè)連接
  • 所有Builder的Connection連接對(duì)象在Builder的Connection池中進(jìn)行統(tǒng)一管理
  • 當(dāng)reuse_connection=true時(shí),Connection對(duì)象在池中的key為空字符串

3. Channel

  • Channel是基于RabbitMQ-server的連接對(duì)象的子連接
  • Channel的上限默認(rèn)根據(jù)RabbitMQ-server的channel limit配置
  • Channel的生命周期與Connection一致,在Connection存續(xù)期間,Channel不會(huì)被銷毀
  • Channel池可以啟用/關(guān)閉復(fù)用模式中:
    • 當(dāng)reuse_channel=true時(shí),連接會(huì)使用Channel池中閑置通道進(jìn)行發(fā)送,如當(dāng)前不存在閑置通道,則創(chuàng)建新的通道;
    • 當(dāng)reuse_channel=false時(shí),連接每次都會(huì)創(chuàng)建新的通道,建議在生產(chǎn)完成后調(diào)用連接關(guān)閉
  • AsyncClient和SyncClient互相不共用TCP連接,所以Channel池也不公用
  • 可以通過getChannels方法獲取Channel對(duì)象池,自行管理釋放

使用

要求

  • php >= 8.0
  • webman-framework >= 1.5
  • rabbitmq-server >= 3.10

安裝

composer require workbunny/webman-rabbitmq

配置

<?php
return [
    'enable' => true,

    'host'               => 'rabbitmq',
    'vhost'              => '/',
    'port'               => 5672,
    'username'           => 'guest',
    'password'           => 'guest',
    'mechanisms'         => 'AMQPLAIN',
    'timeout'            => 10,
    // 重啟間隔
    'restart_interval'   => 0,
    // 心跳間隔
    'heartbeat'          => 50,
    // 心跳回調(diào)
    'heartbeat_callback' => function(){
    },
    // 錯(cuò)誤回調(diào)
    'error_callback'     => function(Throwable $throwable){
    },
    // 復(fù)用連接
    'reuse_connection'   => false,
    // 復(fù)用通道
    'reuse_channel'      => false,
    // AMQPS 如需使用AMQPS請(qǐng)取消注釋
//    'ssl'                => [
//        'cafile'      => 'ca.pem',
//        'local_cert'  => 'client.cert',
//        'local_pk'    => 'client.key',
//    ],
];

QueueBuilder

  • 可實(shí)現(xiàn)官網(wǎng)的5種消費(fèi)模式

命令行

  • 創(chuàng)建
# 創(chuàng)建一個(gè)擁有單進(jìn)程消費(fèi)者的QueueBuilder
./webman workbunny:rabbitmq-builder test --mode=queue
# 創(chuàng)建一個(gè)擁有4進(jìn)程消費(fèi)者的QueueBuilder
./webman workbunny:rabbitmq-builder test 4 --mode=queue

# 創(chuàng)建一個(gè)擁有單進(jìn)程消費(fèi)者的延遲QueueBuilder
./webman workbunny:rabbitmq-builder test --delayed --mode=queue
# 創(chuàng)建一個(gè)擁有4進(jìn)程消費(fèi)者的延遲QueueBuilder
./webman workbunny:rabbitmq-builder test 4 --delayed --mode=queue

# 在 process/workbunny/rabbitmq 目錄下創(chuàng)建 TestBuilder.php
./webman workbunny:rabbitmq-builder test --mode=queue
# 在 process/workbunny/rabbitmq/project 目錄下創(chuàng)建 TestBuilder.php
./webman workbunny:rabbitmq-builder project/test --mode=queue
# 在 process/workbunny/rabbitmq/project 目錄下創(chuàng)建 TestAllBuilder.php
./webman workbunny:rabbitmq-builder project/testAll --mode=queue
# 延遲同理
  • 移除

移除包含了類文件的移除和配置的移除

# 移除Builder
./webman workbunny:rabbitmq-remove test
# 移除延遲Builder
./webman workbunny:rabbitmq-remove test --delayed

# 二級(jí)菜單同理
  • 關(guān)閉

關(guān)閉僅對(duì)配置進(jìn)行移除

# 關(guān)閉Builder
./webman workbunny:rabbitmq-remove test --close
# 關(guān)閉延遲Builder
./webman workbunny:rabbitmq-remove test --close --delayed

# 二級(jí)菜單同理

注意

  • 創(chuàng)建的Builder類可以手動(dòng)修改調(diào)整
  • 為Builder添加進(jìn)process.php的配置可以手動(dòng)修改
  • 延遲隊(duì)列需要為 rabbitMQ 安裝 rabbitmq_delayed_message_exchange 插件
    1. 進(jìn)入 rabbitMQ 的 plugins 目錄下執(zhí)行命令下載插件(以rabbitMQ 3.8.x舉例):
      wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.17/rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez
    2. 執(zhí)行安裝命令
      rabbitmq-plugins enable rabbitmq_delayed_message_exchange

查看Builder

./webman workbunny:rabbitmq-list

注:當(dāng) Builder 未啟動(dòng)時(shí),handler 與 count 顯示為 --

+----------+-------------------------------------------------------------------------+-------------------------------------------------+-------+-------+
| name     | file                                                                    | handler                                         | count | mode  |
+----------+-------------------------------------------------------------------------+-------------------------------------------------+-------+-------+
| test     | /var/www/your-project/process/workbunny/rabbitmq/TestBuilder.php        | process\workbunny\rabbitmq\TestBuilder          | 1     | queue |
| test -d  | /var/www/your-project/process/workbunny/rabbitmq/TestBuilderDelayed.php | process\workbunny\rabbitmq\TestBuilderDelayed   | 1     | group |
+----------+-------------------------------------------------------------------------+-------------------------------------------------+-------+-------+

生產(chǎn)

  • 每個(gè)builder各包含一個(gè)連接,使用多個(gè)builder會(huì)創(chuàng)建多個(gè)連接
  • 生產(chǎn)消息默認(rèn)不關(guān)閉當(dāng)前連接
  • 異步生產(chǎn)的連接與消費(fèi)者共用

1. 同步發(fā)布消息

該方法會(huì)阻塞等待至消息生產(chǎn)成功,返回bool

  • 發(fā)布普通消息

注:向延遲隊(duì)列發(fā)布普通消息會(huì)拋出一個(gè) WebmanRabbitMQException 異常

use function Workbunny\WebmanRabbitMQ\sync_publish;
use process\workbunny\rabbitmq\TestBuilder;

sync_publish(TestBuilder::instance(), 'abc'); # return bool
  • 發(fā)布延遲消息

注:向普通隊(duì)列發(fā)布延遲消息會(huì)拋出一個(gè) WebmanRabbitMQException 異常

use function Workbunny\WebmanRabbitMQ\sync_publish;
use process\workbunny\rabbitmq\TestBuilder;

sync_publish(TestBuilder::instance(), 'abc', headers: [
    'x-delay' => 10000, # 延遲10秒
]); # return bool

2. 異步發(fā)布消息

該方法不會(huì)阻塞等待,立即返回 React\Promise,
可以利用 React\Promise 進(jìn)行 wait;
也可以純異步不等待,React\Promise 項(xiàng)目地址;

  • 發(fā)布普通消息

注:向延遲隊(duì)列發(fā)布普通消息會(huì)拋出一個(gè) WebmanRabbitMQException 異常

use function Workbunny\WebmanRabbitMQ\async_publish;
use process\workbunny\rabbitmq\TestBuilder;

async_publish(TestBuilder::instance(), 'abc'); # return PromiseInterface|bool
  • 發(fā)布延遲消息

注:向普通隊(duì)列發(fā)布延遲消息會(huì)拋出一個(gè) WebmanRabbitMQException 異常

use function Workbunny\WebmanRabbitMQ\async_publish;
use process\workbunny\rabbitmq\TestBuilder;

async_publish(TestBuilder::instance(), 'abc', headers: [
    'x-delay' => 10000, # 延遲10秒
]); # return PromiseInterface|bool

進(jìn)階

1. 自定義Builder

  • 創(chuàng)建自定義Builder需繼承實(shí)現(xiàn)AbstractBuilder;
    • onWorkerStart 消費(fèi)進(jìn)程啟動(dòng)時(shí)會(huì)觸發(fā),一般用于實(shí)現(xiàn)基礎(chǔ)消費(fèi)邏輯;
    • onWorkerStop 消費(fèi)進(jìn)程結(jié)束時(shí)會(huì)觸發(fā),一般用于回收資源;
    • onWorkerReload 消費(fèi)進(jìn)程重載,一般可置空;
    • classContent 用于配合命令行自動(dòng)生成BuilderClass;
    /**
     * Builder 啟動(dòng)時(shí)
     *
     * @param Worker $worker
     * @return void
     */
    abstract public function onWorkerStart(Worker $worker): void;

    /**
     * Builder 停止時(shí)
     *
     * @param Worker $worker
     * @return void
     */
    abstract public function onWorkerStop(Worker $worker): void;

    /**
     * Builder 重加載時(shí)
     *
     * @param Worker $worker
     * @return void
     */
    abstract public function onWorkerReload(Worker $worker): void;

    /**
     * Command 獲取需要?jiǎng)?chuàng)建的類文件內(nèi)容
     *
     * @param string $namespace
     * @param string $className
     * @param bool $isDelay
     * @return string
     */
    abstract public static function classContent(string $namespace, string $className, bool $isDelay): string;

說明

  • 生產(chǎn)可用,歡迎 issue 和 PR;
  • Message 可以理解為隊(duì)列、交換機(jī)的配置信息;
  • 繼承實(shí)現(xiàn) AbstractMessage 可以自定義Message;
  • Builder 可通過 Builder->setMessage() 可設(shè)置自定義配置;
  • 可使用 SyncClientAsyncClient 自行實(shí)現(xiàn)一些自定義消費(fèi)/自定義生產(chǎn)的功能;
贊助商