workerman queue 內(nèi)存使用率持續(xù)增高,直到服務(wù)器宕機(jī)。
剛開始的時(shí)候,單個(gè)進(jìn)程2.43M,然后就一直增高59.04M,直到服務(wù)器宕機(jī)。
我的服務(wù)端沒有產(chǎn)生任何的消息列隊(duì)。就空跑。
reload 后:
過一小段時(shí)間后:
// ######## 消息隊(duì)列消費(fèi)者 ########
$consumer = new Worker();
$consumer->name = 'ImJobConsumer';
// 消費(fèi)的隊(duì)列的id
$consumer->queueId = $QUEUE_ID;
// 慢任務(wù),消費(fèi)者的進(jìn)程數(shù)可以開多一些
$consumer->count = 16;
/**
* 進(jìn)程啟動(dòng)阻塞式的從隊(duì)列中讀取數(shù)據(jù)并處理
*/
$consumer->onWorkerStart = function($consumer)
{
// 獲得隊(duì)列資源
$consumer->queue = msg_get_queue($consumer->queueId);
\Workerman\Lib\Timer::add(0.5, function() use ($consumer){
if(extension_loaded('sysvmsg'))
{
// 循環(huán)取數(shù)據(jù)
while(1)
{
$desiredmsgtype = 1;
$msgtype = 0;
$message = '';
$maxsize = 65535;
// 從隊(duì)列中獲取消息 @see http://php.net/manual/zh/function.msg-receive.php
@msg_receive($consumer->queue , $desiredmsgtype , $msgtype , $maxsize , $message, true, MSG_IPC_NOWAIT);
if(!$message)
{
return;
}
// 假設(shè)消息數(shù)據(jù)為json,格式類似{"class":"class_name", "method":"method_name", "args":[]}
$message = json_decode($message, true);
// 格式如果是正確的,則嘗試執(zhí)行對(duì)應(yīng)的類方法
if(isset($message['class']) && isset($message['method']) && isset($message['args']))
{
// 要調(diào)用的類名,加上Consumer命名空間
$class_name = "\\Consumer\\".$message['class'];
// 要調(diào)用的方法名
$method = $message['method'];
// 調(diào)用參數(shù),是個(gè)數(shù)組
$args = (array)$message['args'];
// 類存在則嘗試執(zhí)行
if(class_exists($class_name))
{
$class = new $class_name;
$callback = array($class, $method);
if(is_callable($callback))
{
call_user_func_array($callback, $args);
}
else
{
echo "$class_name::$method not exist\n";
}
}
else
{
echo "$class_name not exist\n";
}
}
else
{
echo "unknow message\n";
}
}
}
});
};
以守護(hù)進(jìn)程的方式啟動(dòng),然后不斷的查看 status,會(huì)以肉眼可見的速度增長(zhǎng)。
Workerman version:4.1.8
PHP version:8.1.7
Distributor ID: Ubuntu
Description: Ubuntu 20.04.4 LTS
Release: 20.04
Codename: focal
我是在消費(fèi)列隊(duì)信息的時(shí)候,調(diào)用Phalcon的Task.
$app = Bootstrap::handle()->app();
由于不斷給變量賦值,并沒有釋放內(nèi)存。
現(xiàn)在為:
$consumer->onWorkerStart = function($consumer)
{
// 獲得隊(duì)列資源
$consumer->queue = msg_get_queue($consumer->queueId);
$app = Bootstrap::handle()->app();
\Workerman\Lib\Timer::add(0.5, function() use ($consumer,$app){
if(extension_loaded('sysvmsg')) {
$desiredmsgtype = 1;
$msgtype = 0;
$message = '';
$maxsize = 65535;
@msg_receive(
$consumer->queue ,
$desiredmsgtype ,
$msgtype ,
$maxsize ,
$message,
true,
MSG_IPC_NOWAIT
);
在onWorkerStart里每0.5s跑個(gè)while(1)我是沒想到的