我通過如下代碼斷開連接
var_dump($connection->lastTime);
//lastTime在收到正常數(shù)據(jù)時會設(shè)置成當(dāng)前時間,用于判斷是否超時沒收到信息
if ($connection->lastTime < time() - 60 * 5) {
var_dump('超時關(guān)閉鏈接 '.$account['id']);
unset($connections[$account['id']]); //$connections 維護了多個用戶的鏈接
$connection->close();
}
但是斷開后發(fā)現(xiàn)原來的ping還在,服務(wù)端 還會回pong
Timer::add(15, function () use ($connection) {
$connection->send('{"op": "ping"}');
});
以下是完整代碼
global $worker;
$worker = new Worker();
$worker->name = 'ws_1';
$worker->count = 1;
// 進程啟動時
$worker->onWorkerStart = function ($worker) {
$pR = Predis::getInstance()->client;
Timer::add(1, function () use ($pR) {
// 用來存儲哪些用戶已經(jīng)建立連接
static $connections = [];
$aKey = 'ws_account_list';
$accounts = $pR->get($aKey);
$accounts = json_decode($accounts, true);
if (!$accounts) {
return;
}
foreach ($accounts as $account) {
$accountId = $account['id'];
// 判斷這個用戶是否建立連接
if (isset($connections[$accountId])) {
continue;
}
$ws_connection
= new AsyncTcpConnection("ws://xxx.com/ws/");
$ws_connection->transport = "ssl";
$connections[$accountId] = $ws_connection;
$ws_connection->onConnect = function ($connection) use (
$account,
&$connections
) {
var_dump("鏈接上了".$account['id']);
$connection->lastTime = time();
$time = time() * 1000;
$api_secret = $account['api_secret'];
$sig = bin2hex(hash_hmac("sha256",
$time."websocket_login",
$api_secret, true));
$subAcc = array_get($account, 'subaccount', '');
//登錄
$login = [
"op" => "login",
"args" => [
"key" => $account["api_key"],
"sign" => $sig,
"time" => $time,
],
];
if ($subAcc) {
$login['args']['subaccount'] = $subAcc;
}
$connection->send(json_encode($login));
//訂閱
//fill
$connection->send('{"op": "subscribe", "channel": "fills"}');
//orders
$connection->send('{"op": "subscribe", "channel": "orders"}');
Timer::add(15, function () use ($connection) {
$connection->send('{"op": "ping"}');
});
//超時檢查
Timer::add(30, function () use (
$connection,
&$connections,
$account
) {
var_dump($connection->lastTime);
if ($connection->lastTime < time() - 60 * 5) {
var_dump('超時關(guān)閉鏈接 '.$account['id']);
unset($connections[$account['id']]);
$connection->close();
}
});
};
$ws_connection->onMessage = function (
$connection,
$json
) use (
$account
) {
var_dump($json);
$data = json_decode($json, true);
$channel = array_get($data, 'channel', '');
$type = array_get($data, 'type', '');
if ($channel == 'fills' && $type == 'update') {
$connection->lastTime = time();
}
};
$ws_connection->onError = function (
$connection,
$code,
$msg
) {
echo "error: $msg\n";
};
$re = 0;
$ws_connection->onClose = function ($connection) use (&$re
) {
// sleep(1);
var_dump('關(guān)閉');
var_dump('重連');
// var_dump($connection);
// // 如果連接斷開,則在1秒后重連
// $r = $connection->reConnect(1);
// var_dump($r);
};
$ws_connection->connect();
}
});
};
Worker::runAll();
$connection->timer_id = Timer::add(15, function () use ($connection) {
$connection->send('{"op": "ping"}');
});
$connection->onClose = function($connection){
Timer::del($connection->timer_id);
};