問(wèn)題描述
如圖,calltask_init是異步任務(wù)服務(wù)端用來(lái)執(zhí)行耗時(shí)任務(wù),text協(xié)議,進(jìn)程數(shù)開(kāi)12,async_task_proxy為異步任務(wù)客戶端,websocket協(xié)議,進(jìn)程數(shù)開(kāi)1,我在controller里做http接口,觸發(fā)異步任務(wù)時(shí)直接連接async_task_proxy并指定要執(zhí)行的異步任務(wù)。
結(jié)果我連接調(diào)接口8次,卻只有5個(gè)任務(wù)開(kāi)始執(zhí)行了,另外3個(gè)任務(wù)為等待狀態(tài),具體哪個(gè)任務(wù)執(zhí)行,目前沒(méi)發(fā)現(xiàn)規(guī)律。
我希望得到的結(jié)果是:異步任務(wù)進(jìn)程數(shù)開(kāi)12的話,那么只有第13個(gè)調(diào)用時(shí)才會(huì)發(fā)生任務(wù)等待。
運(yùn)行環(huán)境
cat /etc/redhat-release
CentOS Linux release 7.9.2009 (Core)
Workerman version:4.0.33 PHP version:7.4.28
{
"name": "workerman/webman-framework",
"version": "v1.3.9",
}
重現(xiàn)步驟
class TaskInit
{
//...
public function onMessage(TcpConnection $connection, $data)
{
$workerId = $connection->worker->id;
// 只接收json字符串
$data = json_decode($data, true);
$taskService = new \app\service\CallTask();
// 調(diào)\app\service\CallTask中的init方法(耗時(shí))
$taskRes = $taskService->init($data['task_id']);
$connection->send(json_encode($taskRes, JSON_UNESCAPED_UNICODE));
}
//...
}
class AsyncTaskProxy
{
// ...
public function onMessage(TcpConnection $connection, $data)
{
$workerId = $connection->worker->id;
var_dump("proxy: " . $data);
// 接收客戶端發(fā)來(lái)的指令
$data = json_decode($data, true);
if (isset($data['command']) && isset($data['payload'])) {
switch ($data['command']) {
case 'init':
// 指令為“初始化呼叫任務(wù)”
$taskId = $data['payload']['task_id'];
$taskConnection = new AsyncTcpConnection('text://127.0.0.1:9611');
$taskData = array(
'task_id' => $taskId,
);
$taskConnection->send(json_encode($taskData, JSON_UNESCAPED_UNICODE));
$taskConnection->onMessage = function (AsyncTcpConnection $taskConnection, $taskRes) use ($connection) {
var_dump('ws代理中的onMessage:');
var_dump($taskRes);
$taskConnection->close();
$connection->send($taskRes);
};
$taskConnection->connect();
break;
}
}
}
// ...
}
/* process.php */
return [
// ...
'calltask_init' => [
// rabbitmq的消費(fèi)者示例
'handler' => process\TaskInit::class,
// 監(jiān)聽(tīng)的協(xié)議 ip 及端口 (可選)
'listen' => 'text://0.0.0.0:9611',
// 進(jìn)程數(shù) (可選,默認(rèn)1)
'count' => 12,
// 是否開(kāi)啟reusePort (可選,此選項(xiàng)需要php>=7.0,默認(rèn)為true)
'reusePort' => true,
],
'async_task_proxy' => [
// rabbitmq的消費(fèi)者示例
'handler' => process\AsyncTaskProxy::class,
// 監(jiān)聽(tīng)的協(xié)議 ip 及端口 (可選)
'listen' => 'websocket://0.0.0.0:9608',
// 進(jìn)程數(shù) (可選,默認(rèn)1)
'count' => 1,
// 是否開(kāi)啟reusePort (可選,此選項(xiàng)需要php>=7.0,默認(rèn)為true)
'reusePort' => true,
]
];
感謝walkor老大的及時(shí)回復(fù)
通過(guò)在TaskInit的onConnect中打印posix_getpid()確實(shí)發(fā)現(xiàn)了問(wèn)題:
發(fā)起9次請(qǐng)求,其中3次并沒(méi)有觸發(fā)onConnect方法。
我也繼續(xù)排查一下。
給TaskInit 進(jìn)程設(shè)置一個(gè)onConnect方法,onConnect里打印下進(jìn)程pid,也能看出來(lái)連接的進(jìn)程pid。
public function onConnect()
{
echo posix_getpid()."\n";
}
或者
用命令netstat -nt | grep ESTABLISHED | grep 9611
能找出連接的本地端口,
然后用命令 lsof -i:本地端口
能找到連接的是哪個(gè)進(jìn)程 pid
確實(shí)reusePort設(shè)置為false后問(wèn)題就解決了,只不過(guò)我還沒(méi)想明白為什么,復(fù)用端口設(shè)置為false,就會(huì)優(yōu)先選擇空閑進(jìn)程,而為true時(shí)就會(huì)優(yōu)先選擇已經(jīng)連接上進(jìn)程。
reusePort 為 true,由linux內(nèi)核分配連接到進(jìn)程,可能你的linux內(nèi)核沒(méi)有做到100%平均分配。
reusePort為false,哪個(gè)進(jìn)程空閑,哪個(gè)進(jìn)程去認(rèn)領(lǐng)連接