通過綜合分析、研究和探索workerman和PhpAmqpLibr相關(guān)手冊,經(jīng)過長期的實踐,現(xiàn)分享一套基于workerman的rabbitmq客戶端生產(chǎn)者和消費者代碼,供大家測試,使用。
rabbitmq_productor.php
<?php
require_once ('./vendor/autoload.php');
require_once ("./Lib_global.php");
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Workerman\Worker;
use Workerman\Lib\Timer;
use Workerman\Connection\TcpConnection;
use Workerman\Connection\AsyncUdpConnection;
use Workerman\Connection\AsyncTcpConnection;
$worker = new Worker();
//開啟進程數(shù)量
$worker->count = 4;
$worker->name = "rabbitmq_productor";
$date = date("Y-m-d");
Worker::$pidFile = "var/mq_service_productor.pid";
Worker::$logFile = "var/mq_service_productor_logFile.log";
Worker::$stdoutFile = "var/mq_service_productor_stdout.log";
$worker->onWorkerStart = function () {
global $rabbit_connection, $rabbit_channel , $rabbitmq_exchange_name , $rabbitmq_queueName;
$rabbitmq_exchange_name = "exchange_name";
$rabbitmq_queueName = "queuePrefix_QueueName";
// 連接 rabbitmq 服務(wù)
$rabbit_connection = new AMQPStreamConnection(RABBITMQ_SERVER_IP, RABBITMQ_SERVER_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD);
// 獲取信道
$rabbit_channel = $rabbit_connection->channel();
//聲明創(chuàng)建交換機
$rabbit_channel->exchange_declare( $rabbitmq_exchange_name , 'topic', false, true, false);
// 聲明創(chuàng)建隊列
$rabbit_channel->queue_declare( $rabbitmq_queueName , false, true, false, false);
// 綁定隊列
$rabbit_channel->queue_bind($rabbitmq_queueName , $rabbitmq_exchange_name, $rabbitmq_queueName);
//可以修改時間間隔,如果為0.002秒,則每秒產(chǎn)生500*4=2000條
Timer::add( 0.002 , function() {
global $rabbit_connection, $rabbit_channel , $rabbitmq_exchange_name , $rabbitmq_queueName;
//需要向rabbitmq隊列投遞消息的內(nèi)容,通常為數(shù)組,經(jīng)過json轉(zhuǎn)換再發(fā)送
$data_all = array(
'name' => "張三",
'time' => time(),
);
$data_all_out_json = json_encode($data_all , JSON_UNESCAPED_UNICODE );
$data_all_out_msg = new AMQPMessage($data_all_out_json, ['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
//向隊列里面寫內(nèi)容
@$rabbit_channel->basic_publish($data_all_out_msg , $rabbitmq_exchange_name , $rabbitmq_queueName);
});
};
Worker::runAll();
rabbitmq_comsumer.php
<?php
require_once ('./vendor/autoload.php');
require_once ("./Lib_global.php");
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Workerman\Worker;
use Workerman\Lib\Timer;
use Workerman\Connection\TcpConnection;
use Workerman\Connection\AsyncUdpConnection;
use Workerman\Connection\AsyncTcpConnection;
$worker = new Worker();
//開啟進程數(shù)量
$worker->count = 10;
$worker->name = "rabbitmq_comsumer";
$date = date("Y-m-d");
Worker::$pidFile = "var/rabbitmq_comsumer.pid";
Worker::$logFile = "var/rabbitmq_comsumer_logFile.log";
Worker::$stdoutFile = "var/rabbitmq_comsumer_stdout.log";
$worker->onWorkerStart = function () {
global $rabbit_connection, $rabbit_channel , $rabbitmq_exchange_name , $rabbitmq_queueName;
$rabbitmq_exchange_name = "exchange_name";
$rabbitmq_queueName = "queuePrefix_QueueName";
// 連接 rabbitmq 服務(wù)
$rabbit_connection = new AMQPStreamConnection(RABBITMQ_SERVER_IP, RABBITMQ_SERVER_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD);
// 獲取信道
$rabbit_channel = $rabbit_connection->channel();
// 聲明隊列
$rabbit_channel->queue_declare( $rabbitmq_queueName , false, true, false, false);
// 綁定隊列
$rabbit_channel->queue_bind($rabbitmq_queueName , $rabbitmq_exchange_name, $rabbitmq_queueName);
// 消費者訂閱隊列
$rabbit_channel->basic_consume($rabbitmq_queueName , '', false, false, false, false,
function ($msg){
global $rabbit_channel , $rabbitmq_exchange_name , $rabbitmq_queueName;
$data_all_str = $msg->body;
// 消息確認,表明已經(jīng)收到這條信息
@$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
//echo "{$data_all_str}\n";
//這里是業(yè)務(wù)處理邏輯
//如果這條消息處理失敗,你可以在這里將其再次放回消息隊列(最好給消息做個放回去的次數(shù)判斷,避免無限失敗和無限循環(huán))
});
//這里是重點,網(wǎng)上很多教程起的使用while死循環(huán)容易導(dǎo)致程序異步代碼無法執(zhí)行,這種方法能避免
//按照每個進程每秒處理1000條來設(shè)定定時器,每個進程每秒消費1000條,4個進程每秒消費4000條,經(jīng)過實際驗證,將時間改小也無法提升單個進程的處理速度
//實際測試,4個進程每秒的消費能力有4000左右,可以滿足很多中小型系統(tǒng)的應(yīng)用,如果想提升系統(tǒng)處理能力,
//可以增加消費者進程數(shù)量來解決,比如我將進程數(shù)量提升到10個,每秒處理能力約為1萬
//這個機制,希望能力更強的你來進行優(yōu)化
Timer::add( 0.0001 , function() {
global $rabbit_channel;
if( count($rabbit_channel->callbacks) > 0 ){
$rabbit_channel->wait();
}
});
};
Worker::runAll();
感謝分享
@chaz6chez,安裝測試了一下,發(fā)現(xiàn)每創(chuàng)建一個隊列,默認創(chuàng)建了一個交換機,請教一下同一類型為何不使用同一個交換機呢?對于已經(jīng)創(chuàng)建好的隊列,如何指定交換機和隊列名稱呢。還有就是消息消費失敗重試如何操作比較好呢?
@doit FastBuilder是一個實現(xiàn)點對點消費模式的消費隊列Builder,在這個模型里就是“N個生產(chǎn)者對應(yīng)一個業(yè)務(wù)對應(yīng)一個交換機對應(yīng)一個隊列對應(yīng)N個消費者”;簡單講就是一個業(yè)務(wù)對應(yīng)一條隊列。這樣的好處是所有業(yè)務(wù)是隔離的,業(yè)務(wù)對于rabbitMQ是透明的,它僅僅只做隊列的分發(fā)而已。
FastBuilder默認以ClassName為名,也有默認的消費失敗重試方式,通常來說有兩種重試機制,隊頭阻塞重試,回到隊尾重試;
如果要實現(xiàn)其他消費模式的話,可以繼承Builder,自行實現(xiàn)即可
666,感謝分享,贊一個,有機會試用一下
另外,對于其他方面,說一下個人愚見(跟樓主這個沒有太大關(guān)系)
很多時候,當(dāng)遇到的確需要使用MQ的情況,中小型公司,真的不會花額外的成本去用專業(yè)的MQ,最多用redis
我知道,可能很多人會說,這樣很操蛋,但事實上,有很多公司,就是這樣,成本能低就低
但redis目前有一個很大的問題,目前的webman隊列,是使用的普通的list結(jié)構(gòu),這個是沒有ack機制的
這對于某些對消息可靠性有比較高的要求的情況,就很尷尬
我們目前使用webman的redis隊列插件,都是用在對消息可靠性要求不是特別高的場景,比如通知等
我有個想法,基于redis stream 寫一套類似的消息隊列插件
這樣就可以真正滿足實際需求了
有時間一定寫一個出來,相信這個可能會更加實用
大佬,require_once ("./Lib_global.php");這個是什么文件,用到了嗎,能發(fā)一下嗎?
一些預(yù)定義的變量,設(shè)置PHP運行環(huán)境等
<?php
define('START_TIME', microtime(true));
define('START_MEM', memory_get_usage());
define('DS', DIRECTORY_SEPARATOR);
//defined('ROOT_PATH') or define('ROOT_PATH', dirname(realpath(APP_PATH)) . DS);
defined('ROOT_PATH') or define('ROOT_PATH', dirname($_SERVER['SCRIPT_FILENAME']) . DS);
defined('LOG_PATH') or define('LOG_PATH', ROOT_PATH . 'Log' . DS);
// 環(huán)境常量
define('IS_CLI', PHP_SAPI == 'cli' ? true : false);
define('IS_WIN', strpos(PHP_OS, 'WIN') !== false);
$config = array();
$global = array();
// do NOT run this script through a web browser
if (!isset($_SERVER['argv'][0]) || isset($_SERVER['REQUEST_METHOD']) || isset($_SERVER['REMOTE_ADDR'])) {
die('<br><strong>This script is only meant to run at the command line.</strong>');
}
/ let PHP run just as long as it has to /
ini_set('max_execution_time', '0');
//這個必須有
ini_set('memory_limit','320M');
//error_reporting('E_ALL');
/ this should be auto-detected, set it manually if needed /
$config["server_os"] = (strstr(PHP_OS, "WIN")) ? "win32" : "unix";
$config['root_path'] = ROOT_PATH;
$config['logPath'] = LOG_PATH;
chdir($config['root_path']);
分享一個進一步優(yōu)化的方法,不使用Timer驅(qū)動,使用Swoole協(xié)程方式
//全局里面加個:
Worker::$eventLoopClass = 'Workerman\Events\Swoole'; //將事件引擎替換為Swoole,前提是安裝了Swoole,我安裝的是4.8.12
Co::set(['hook_flags' => SWOOLE_HOOK_SLEEP]); //將sleep一鍵協(xié)程化
Swoole\Coroutine::set(['enable_deadlock_check' => false]); //取消時間死鎖檢查機制,避免報錯
將原來的以下代碼進行替換
/*
//按照每個進程每秒處理1萬條來設(shè)定定時器
Timer::add( 0.001 , function() {
global $rabbit_channel;
if( count($rabbit_channel->callbacks) > 0 ){
$rabbit_channel->wait();
}
});
*/
//替換為:
// 開始消費
while (count($rabbit_channel->callbacks)) {
$rabbit_channel->wait();
usleep(1000);
}
分享一下我寫的rabbitmq客戶端類庫,支持定時與服務(wù)器握手,支持發(fā)生錯誤時進行生重連
<?php
/**
//composer require php-amqplib/php-amqplib
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPWriter;
use PhpAmqpLib\Wire\AMQPReader;
class rabbitmq_client {
public $connection = "";
public $channel = "";
public $is_connected = false ;
public $exchange_name = "" ;
public $queen_name = "";
public $comsume_callback = null;
public $config_option = array(
'host' => "127.0.0.1",
'port' => "5672",
'user' => "admin",
'password' => "admin",
'exchange_name' => 'default_exchange_name',
'queen_name' => 'default_queen_name',
);
public function __construct($option = array()){
$this->config_option['host'] = $option['host'];
$this->config_option['port'] = $option['port'];
$this->config_option['user'] = $option['user'];
$this->config_option['password'] = $option['password'];
$this->exchange_name = $option['exchange_name'];
$this->queen_name = $option['queen_name'];
}
function app_log($log){
//將日志信息發(fā)送給日志服務(wù)器
$ts = round(microtime(true) - time() , 6);
@list($ts1 , $ts2) = @explode("." , $ts);
$logData = "{$ts1}.{$ts2} {$log}";
//logToFile($logData);
if(function_exists("logToScreen") == true){
logToScreen($logData , true);
}else{
echo $logData."\n";
}
}
public function connect(){
try{
$this->connection = new AMQPStreamConnection(
$this->config_option['host'] ,
$this->config_option['port'] ,
$this->config_option['user'] ,
$this->config_option['password'] ,
'/' ,
false ,
'AMQPLAIN' ,
null,
'en_US' ,
3.0 ,
3.0 ,
null ,
true ,
60
);
if( $this->connection ->isConnected() == true){
$this->channel = $this->connection->channel();
//聲明交換機
$this->channel->exchange_declare( $this->exchange_name , 'topic', false, true, false);
// 聲明隊列
$this->channel->queue_declare( $this->queen_name , false, true, false, false);
// 綁定隊列
$this->channel->queue_bind($this->queen_name , $this->exchange_name , $this->queen_name );
$this->is_connected = true;
$this->app_log("rabbitmq connected");
return true;
}
}catch (Exception $e) {
$this->app_log("error catched :".$e->getMessage());
$this->is_connected = false;
}
return false;
}
public function reconnect(){
if( $this->is_connected == false ){
if( $this->connect() == true ){
//重新連接到服務(wù)器
$this->is_connected = true;
return true;
}
}
return false;
}
/**
* @return void
*
* 向服務(wù)器發(fā)送
*/
function write_heartbeat(){
if($this->is_connected == true){
try{
//app_log("heartbeat");
$pkt = new AMQPWriter();
$pkt->write_octet(8);
$pkt->write_short(0);
$pkt->write_long(0);
$pkt->write_octet(0xCE);
$this->connection->write($pkt->getvalue());
}catch (Exception $e) {
$this->app_log("error catched :".$e->getMessage());
$this->is_connected = false;
$this->reconnect();
}
}else{
// false
if( $this->connect() == true ){
//嘗試連接到服務(wù)器
$this->is_connected = true;
}
}
}
/**
* @param $data
* @param $queen_name
* @param $is_persistent
* @param $is_debug
* @return void
*/
function publish( $data = "" , $is_persistent = true , $exchange_name_input = "" , $queen_name_input = ""){
$delivery_mod = AMQPMessage::DELIVERY_MODE_PERSISTENT;
if($is_persistent == false){
$delivery_mod = AMQPMessage::DELIVERY_MODE_NON_PERSISTENT;
}
$exchange_name = "";
if(strlen($exchange_name_input) > 0){
$exchange_name = $exchange_name_input;
}else{
$exchange_name = $this->exchange_name;
}
$queen_name = "";
if(strlen($queen_name_input) > 0){
$queen_name = $queen_name_input;
}else{
$queen_name = $this->exchange_name;
}
$rabbit_msg = new AMQPMessage($data , ['content_type'=>'text/plain','delivery_mode'=>$delivery_mod]); //定義消息
try{
// 發(fā)送消息
$this->channel->basic_publish($rabbit_msg, $exchange_name, $queen_name);
}catch (Exception $e) {
$this->app_log("error catched :".$e->getMessage());
$this->is_connected = false;
if( $this->reconnect() == true ){
$this->is_connected = true;
}
}
}
//在做消費時,對流量進行控制,防止出現(xiàn)丟數(shù)據(jù)
function set_comsume_qos( $prefetch_size = 0 , $prefetch_count = 1 ){
$this->channel->basic_qos( $prefetch_size , $prefetch_count ,false); //當(dāng)有消息在處理時不要發(fā)過來
}
/*
function comsume_callback($msg){
//收到MQ消息
$message_body = $msg->body;
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
//echo "[x] Received ", $message_body, "\n";
//redis_add_statistic( $redis , "rabbitmq:qos_test_consumption" , 0.1);
}
*/
/*
Co::set(['hook_flags' => SWOOLE_HOOK_SLEEP]);
Swoole\Coroutine::set(['enable_deadlock_check' => false]);
* */
function comsume_swoole_go( $queen_name_input = ""){
$queen_name = "";
if(strlen($queen_name_input) > 0){
$queen_name = $queen_name_input;
}else{
$queen_name = $this->queen_name;
}
if($this->is_connected == true) {
go(function () {
// 消費者訂閱隊列
try {
if( !$this->comsume_callback ){
$this->app_log("function comsume_callback must be set");
return false;
}
$this->channel->basic_consume( $this->queen_name , '' , false , false , false , false , $this->comsume_callback);
} catch (Exception $e) {
$this->app_log("error catched :" . $e->getMessage());
$this->is_connected = false;
$this->reconnect();
}
// 開始消費
try {
/*
while ( count($this->channel->callbacks) ) {
$this->channel->wait();
usleep(1);
}*/
while ( $this->channel->is_consuming() ) {
$this->channel->wait();
usleep(1);
}
} catch (Exception $e) {
app_log("error catched when consuming:" . $e->getMessage());
$this->is_connected = false;
$this->reconnect();
}
});
}else{
// false
$this->connect();
}
}
function comsume( $queen_name_input = ""){
$queen_name = "";
if(strlen($queen_name_input) > 0){
$queen_name = $queen_name_input;
}else{
$queen_name = $this->queen_name;
}
if($this->is_connected == true) {
// 消費者訂閱隊列
try {
if( !$this->comsume_callback ){
$this->app_log("function comsume_callback must be set");
return false;
}
$this->channel->basic_consume( $this->queen_name , '' , false , false , false , false , $this->comsume_callback);
} catch (Exception $e) {
$this->app_log("error catched :" . $e->getMessage());
$this->is_connected = false;
$this->reconnect();
}
// 開始消費
try {
/*
while ( count($this->channel->callbacks) ) {
$this->channel->wait();
usleep(1);
}*/
while ( $this->channel->is_consuming() ) {
$this->channel->wait();
}
} catch (Exception $e) {
app_log("error catched when consuming:" . $e->getMessage());
$this->is_connected = false;
$this->reconnect();
}
}else{
// false
$this->connect();
}
}
}
生產(chǎn)者:
chdir(dirname($_SERVER['SCRIPT_FILENAME']));
include_once __DIR__ . '/vendor/autoload.php';
include_once("./Lib_global.php");
include_once("./Lib_functions_rabbitmq.php");
use Workerman\Worker;
use Workerman\Lib\Timer;
use Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
use Swoole\Coroutine;
$worker = new Worker();
//開啟進程數(shù)量
$worker->count = 2;
$processName = "test_mq_pub";
$worker->name = $processName;
$date_ymd = date("Y-m-d");
Worker::$pidFile = ROOT_PATH."var/{$processName}.pid";
Worker::$logFile = ROOT_PATH."var/{$processName}_logFile.log";
Worker::$stdoutFile = ROOT_PATH."var/{$processName}_stdout.log";
Worker::$eventLoopClass = 'Workerman\Events\Swoole';
$redis = "";
$is_debug = false; //全局配置,是否開啟調(diào)試模式
$rabbitmq_client = "";
function app_log($log){
global $workerId;
//將日志信息發(fā)送給日志服務(wù)器
$ts = round(microtime(true) - time() , 6);
@list($ts1 , $ts2) = @explode("." , $ts);
//logToFile(".{$ts2} [$workerId] ".$log);
logToScreen(".{$ts2} [$workerId] ".$log , true);
}
$worker->onWorkerStart = function() {
global $redis, $worker, $workerId , $rabbitmq_client;
Co::set(['hook_flags' => SWOOLE_HOOK_SLEEP]);
Swoole\Coroutine::set(['enable_deadlock_check' => false]);
//根據(jù)daemon順序延時,這是確保系統(tǒng)正常運行的關(guān)鍵
usleep(1000 * 100 * ($worker->id + 1));
$workerId = $worker->id ;
echo date("Y-m-d H:i:s") . " 服務(wù)進程{$workerId}已經(jīng)啟動!\n";
//連接到Redis服務(wù)器
$redis = redis_connect(REDIS_SERVER , REDIS_PORT , REDIS_PASSWORD , REDIS_DBNAME);
//定期與redis握手,避免被斷掉,該動作每個進程都得執(zhí)行
Timer::add( 60 , function (){
global $redis, $worker;
$redis_status = $redis->ping();
if($redis_status == false){
$redis = redis_connect(REDIS_SERVER , REDIS_PORT , REDIS_PASSWORD , REDIS_DBNAME);
}
});
$rabbitmq_config_option = array();
$rabbitmq_config_option['host'] = RABBITMQ_SERVER_IP;
$rabbitmq_config_option['port'] = RABBITMQ_SERVER_PORT;
$rabbitmq_config_option['user'] = RABBITMQ_USERNAME;
$rabbitmq_config_option['password'] = RABBITMQ_PASSWORD;
$rabbitmq_config_option['exchange_name'] = "4b_ads_CLASS_mqTest";
$rabbitmq_config_option['queen_name'] = "4b_ads_CLASS_mqTest";
$rabbitmq_client = new rabbitmq_client($rabbitmq_config_option);
//不斷嘗試與Rabbitmq服務(wù)器建立連接
while( $rabbitmq_client->is_connected == false ){
if($rabbitmq_client->connect() == true){
break;
}
$rabbitmq_client->app_log("rabbitmq server connect failed");
sleep(1);
}
//執(zhí)行定時握手任務(wù)
Timer::add( 55 , function ()use($rabbitmq_client) {
// 發(fā)送心跳數(shù)據(jù)
$rabbitmq_client->write_heartbeat();
});
Timer::add( 0.001 , function ()use($rabbitmq_client) {
if($rabbitmq_client->is_connected == true){
$data_json = GetRandStr(128);
$rabbitmq_msg = json_encode($data_json);
$rabbitmq_client->publish($rabbitmq_msg);
}
});
};
Worker::runAll();
消費者:
chdir(dirname($_SERVER['SCRIPT_FILENAME']));
include_once __DIR__ . '/vendor/autoload.php';
include_once("./Lib_global.php");
include_once("./Lib_functions_rabbitmq.php");
use Workerman\Worker;
use Workerman\Lib\Timer;
use Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
use Swoole\Coroutine;
$worker = new Worker();
//開啟進程數(shù)量
$worker->count = 2;
$processName = "test_mq_sub";
$worker->name = $processName;
$date_ymd = date("Y-m-d");
Worker::$pidFile = ROOT_PATH . "var/{$processName}.pid";
Worker::$logFile = ROOT_PATH . "var/{$processName}_logFile.log";
Worker::$stdoutFile = ROOT_PATH . "var/{$processName}_stdout.log";
Worker::$eventLoopClass = 'Workerman\Events\Swoole';
$redis = "";
$is_debug = false; //全局配置,是否開啟調(diào)試模式
$rabbitmq_client = "";
function app_log($log)
{
global $workerId;
//將日志信息發(fā)送給日志服務(wù)器
$ts = round(microtime(true) - time(), 6);
@list($ts1, $ts2) = @explode(".", $ts);
//logToFile(".{$ts2} [$workerId] ".$log);
logToScreen(".{$ts2} [$workerId] " . $log, true);
}
$worker->onWorkerStart = function () {
global $redis, $worker, $workerId, $rabbitmq_client;
Co::set(['hook_flags' => SWOOLE_HOOK_SLEEP]);
Swoole\Coroutine::set(['enable_deadlock_check' => false]);
//根據(jù)daemon順序延時,這是確保系統(tǒng)正常運行的關(guān)鍵
usleep(1000 * 100 * ($worker->id + 1));
echo date("Y-m-d H:i:s") . " 服務(wù)進程{$worker->id}已經(jīng)啟動!\n";
$workerId = $worker->id ;
//連接到Redis服務(wù)器
$redis = redis_connect(REDIS_SERVER , REDIS_PORT , REDIS_PASSWORD , REDIS_DBNAME);
//定期與redis握手,避免被斷掉,該動作每個進程都得執(zhí)行
Timer::add( 60 , function (){
global $redis, $worker;
$redis_status = $redis->ping();
if($redis_status == false){
$redis = redis_connect(REDIS_SERVER , REDIS_PORT , REDIS_PASSWORD , REDIS_DBNAME);
}
});
//初始化Rabbitmq連接
$rabbitmq_config_option = array();
$rabbitmq_config_option['host'] = RABBITMQ_SERVER_IP;
$rabbitmq_config_option['port'] = RABBITMQ_SERVER_PORT;
$rabbitmq_config_option['user'] = RABBITMQ_USERNAME;
$rabbitmq_config_option['password'] = RABBITMQ_PASSWORD;
$rabbitmq_config_option['exchange_name'] = "4b_ads_CLASS_mqTest";
$rabbitmq_config_option['queen_name'] = "4b_ads_CLASS_mqTest";
$rabbitmq_client = new rabbitmq_client($rabbitmq_config_option);
//不斷嘗試與Rabbitmq服務(wù)器建立連接
while( $rabbitmq_client->is_connected == false ){
if($rabbitmq_client->connect() == true){
break;
}
$rabbitmq_client->app_log("rabbitmq server connect failed");
sleep(1);
}
//執(zhí)行定時握手任務(wù)
Timer::add( 55 , function ()use($rabbitmq_client) {
// 發(fā)送心跳數(shù)據(jù)
$rabbitmq_client->write_heartbeat();
});
$rabbitmq_client->comsume_callback = function ($msg){
//收到MQ消息
$message_body = $msg->body;
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
//echo "[x] Received ", $message_body, "\n";
};
//$rabbitmq_client->comsume_swoole_go(); //采用協(xié)程方式處理
$rabbitmq_client->comsume(); //傳統(tǒng)方式
};
Worker::runAll();
在使用時發(fā)現(xiàn)或多或少有一些問題(最大的問題是CPU搶占問題,導(dǎo)致workerman內(nèi)的基于定時任務(wù)長時間得不到執(zhí)行),畢竟官方的內(nèi)容都是同步機制的,我在想有沒有可能使用異步實現(xiàn)。
經(jīng)過長時間的研究,終于解決了這個問題,個人認為比官方基于bunny+React的方式更好使用一些。