$ws_worker->onConnect = function ($connection){
global $ws_worker;
$key = 'connection_'.SERVER_ID.'_'.$ws_worker->id.'_'.$connection->id;
//111111111111111111111111111111111111111111111111
Channel\Client::on($key, function($event_data)use($connection){
//222222222222222222222222
Channel\Client::on('aaaaaa', function($multi_data)use($connection){
$connection->send('訂閱的aaa事件');
});
$connection->send(‘111111111處訂閱的事件’);
});
};
在onstart中 給每個(gè)connection訂閱$key事件 $key中是唯一的 在以后中在222222處訂閱群發(fā)aaaaa事件 但是每次只有最后一個(gè)連接可以接受到aaaaaa事件 為什么
$ws_worker->onConnect = function ($connection){
global $ws_worker;
$key = 'connection_'.'_'.$ws_worker->id.'_'.$connection->id;
var_dump($key);
$connection->lastMessageTime = time();
Channel\Client::on($key, function($event_data)use(&$connection){
$connection->send('這個(gè)是單發(fā)'.date('Y-m-d H:i:s'));
});
Channel\Client::on('aaa', function($multi_data)use(&$connection){
echo "\n2\n";
$connection->send($multi_data);
});
};
我這樣在 onconnet中訂閱aaa事件 不是每個(gè)connection都可以接受到這個(gè)aaa事件嗎 目前我測(cè)試貌似只有2個(gè)可以收到 再多的鏈接就收不到aaa
你的代碼是在onconnet中讓Channel\Client訂閱aaa事件,而不是每個(gè)connection訂閱aaa事件。Channel\Client訂閱aaa事件是一個(gè)全局事件,代碼只不過(guò)是不停的更改Channel\Client訂閱aaa事件的回調(diào)函數(shù),但只有最后一個(gè)回調(diào)有效。
另外不停的給每個(gè)connection添加'connection_'.'_'.$ws_worker->id.'_'.$connection->id事件,鏈接關(guān)閉時(shí)又不解除事件監(jiān)聽(tīng),會(huì)導(dǎo)致對(duì)應(yīng)事件所占內(nèi)存一直被占用,內(nèi)存越來(lái)越大,形成內(nèi)存泄露。
群發(fā)做法類似下面做法:
require_once __DIR__ . '/../Workerman/Autoloader.php';
require_once __DIR__ . '/Channel/src/Server.php';
require_once __DIR__ . '/Channel/src/Client.php';
use Workerman\Worker;
$channel_server = new Channel\Server('0.0.0.0', 2206);
$worker = new Worker('websocket://0.0.0.0:1234');
// 全局群組到鏈接的映射數(shù)組
$group_con_map = array();
$worker->onWorkerStart = function(){
// Channel客戶端連接到Channel服務(wù)端
Channel\Client::connect('127.0.0.1', 2206);
// 監(jiān)聽(tīng)全局分組發(fā)送消息事件
Channel\Client::on('send_to_group', function($event_data){
$group_id = $event_data;
$message = $event_data;
global $group_con_map;
var_dump(array_keys($group_con_map));
if (isset($group_con_map)) {
foreach ($group_con_map as $con) {
$con->send($message);
}
}
});
};
$worker->onMessage = function($con, $data){
// 加入群組消息{"cmd":"add_group", "group_id":"123"}
// 或者 群發(fā)消息{"cmd":"send_to_group", "group_id":"123", "message":"這個(gè)是消息"}
$data = json_decode($data, true);
var_dump($data);
$cmd = $data;
$group_id = $data;
switch($cmd) {
// 鏈接加入群組
case "add_group":
global $group_con_map;
// 將鏈接加入到對(duì)應(yīng)的群組數(shù)組里
$group_con_map = $con;
// 記錄這個(gè)鏈接加入了哪些群組,方便在onclose的時(shí)候清理group_con_map對(duì)應(yīng)群組的數(shù)據(jù)
$con->group_id = isset($con->group_id) ? $con->group_id : array();
$con->group_id = $group_id;
break;
// 群發(fā)消息給群組
case "send_to_group":
// Channel\Client給所有服務(wù)器的所有進(jìn)程廣播分組發(fā)送消息事件
Channel\Client::publish('send_to_group', array(
'group_id'=>$group_id,
'message'=>$data
));
break;
}
};
// 這里很重要,鏈接關(guān)閉時(shí)把鏈接從全局群組數(shù)據(jù)中刪除,避免內(nèi)存泄漏
$worker->onClose = function($con){
global $group_con_map;
// 遍歷鏈接加入的所有群組,從group_con_map刪除對(duì)應(yīng)的數(shù)據(jù)
if (isset($con->group_id)) {
foreach ($con->group_id as $group_id) {
unset($group_con_map);
}
if (empty($group_con_map)) {
unset($group_con_map);
}
}
};
Worker::runAll();
workerman手冊(cè)加了這個(gè)例子 https://www.kancloud.cn/walkor/workerman/346075
其實(shí)群發(fā)最好用GatewayWorker框架,這些都是有現(xiàn)成的接口,直接調(diào)用就好了。