這個(gè)問(wèn)題反復(fù)看了好幾次論壇里的回答還是無(wú)法解決。
大致邏輯如下:
1.前端用戶連接A服務(wù)器workerman的Websocket服務(wù)。
2.A服務(wù)器在Event中即充擔(dān)客戶端也擔(dān)任服務(wù)端,A服務(wù)器連接B服務(wù)的WebScoket服務(wù)。
3.B服務(wù)連接后會(huì)一直吐一段數(shù)據(jù)給A服務(wù)器,A服務(wù)器負(fù)責(zé)轉(zhuǎn)發(fā)給前端。
4.期間當(dāng)多個(gè)前端用戶同時(shí)使用A服務(wù)WebScoket服務(wù)時(shí),其中某個(gè)A服務(wù)器與B服務(wù)器的鏈接會(huì)被中斷,且不觸發(fā)onClose,本地調(diào)試和日志排查后無(wú)報(bào)錯(cuò)。且是一個(gè)概率事件。
5.想和大佬們討論下這個(gè)問(wèn)題應(yīng)該如何精準(zhǔn)定位和排查以及處理
private static $ws_connection;
public static function onWorkerStart($worker)
{
self::$ws_connection = new AsyncTcpConnection("ws://***.***.***.**:80");
}
public static function onConnect($client_id)
{
}
public static function onWebSocketConnect($client_id, $data)
{
}
public static function onMessage($client_id, $message)
{
/**
* $msg = [ ];
*
*/
$msg = json_decode($message,TRUE);
try {
/* <<連接遠(yuǎn)端socket>> Start */
// 以websocket協(xié)議連接遠(yuǎn)程websocket服務(wù)器
// 連上后發(fā)送數(shù)據(jù)
$sendData = [];
self::$ws_connection->onConnect = function(AsyncTcpConnection $connection)use ($sendData,$client_id){
$_SESSION['time_id'] = Timer::add(10,function () use($connection){
$connection->send('{"type":"ping"}');
});
static $is_first_connect = true;
if (!$is_first_connect) return;
$is_first_connect = false;
$connection->send(json_encode($sendData,JSON_UNESCAPED_UNICODE));
Log::info($client_id . '連接成功');
};
// 遠(yuǎn)程websocket服務(wù)器發(fā)來(lái)消息時(shí)
self::$ws_connection->onMessage = function(AsyncTcpConnection $connection, $dataMsg) use ($client_id,$data,$res){
Log::info($client_id . $dataMsg);
$dataMsg = json_decode($dataMsg,true);
if(isset($dataMsg['type']) && $dataMsg['type'] == 'msg'){
Gateway::sendToClient($client_id,json_encode($choices,JSON_UNESCAPED_UNICODE));
};
// 連接上發(fā)生錯(cuò)誤時(shí),一般是連接遠(yuǎn)程websocket服務(wù)器失敗錯(cuò)誤
self::$ws_connection->onError = function($connection, $code, $msg){
Log::error("error: $msg\n");
};
self::$ws_connection->onClose = function($connection) use ($client_id){
Log::info($client_id . '我下線了,重連\n');
};
// 設(shè)置好以上各種回調(diào)后,執(zhí)行連接操作
self::$ws_connection->connect();
/* <<連接遠(yuǎn)端socket>> End */
}catch (\Exception $e){
$res['code'] = $e->getCode();
$res['msg'] = $e->getMessage();
Gateway::sendToClient($client_id,json_encode($res,JSON_UNESCAPED_UNICODE));
return;
}
}
public static function onClose($client_id)
{
// Log::info($client_id . 'Close');
}
代碼有bug吧。
1、你代碼里用的self::$ws_connection,就是所有前端用戶共用一個(gè) self::$ws_connection,但是你代碼里又是在onMessage里不斷重置 self::$ws_connection。應(yīng)該是在onWorkerStart里設(shè)置self::$ws_connection才對(duì)。
2、沒(méi)有人保證連接永遠(yuǎn)不斷開(kāi),所以要在onClose里加一個(gè)reconnect()邏輯,斷開(kāi)自動(dòng)重連。前端代碼也是一樣。
其他都就是連接數(shù)超過(guò)1024要裝event擴(kuò)展,優(yōu)化linux內(nèi)核
我在服務(wù)端打印了 Gateway::isOnline($client_id) 文檔中寫(xiě)這個(gè)值是1的時(shí)候在線 0的時(shí)候就掉線了
你不是說(shuō) ”其中某個(gè)A服務(wù)器與B服務(wù)器的鏈接會(huì)被中斷“ 么?怎么會(huì)用$client_id判斷在線,$client_id是瀏覽器到gatewayWorker的鏈接id,不是A服務(wù)器與B服務(wù)器的鏈接。
A服務(wù)器到B服務(wù)器是通過(guò)AsyncTcpContent通過(guò)websocket鏈接的 所以是可以在B服務(wù)器看到A服務(wù)器與B服務(wù)器的鏈接是否在線的
onWorkerStart干的事情
/**
* 進(jìn)程啟動(dòng)時(shí)
* @param Worker $worker
* @return void
*/
public function onWorkerStart(Worker $worker): void
{
try {
$this->startChannel();
$this->startCrontab();
$this->startPublic();
$this->startPrivate();
//實(shí)時(shí)交易
TradeOrderHelper::start($this->websocket_api->getConfig());
} catch (Throwable $throwable) {
print_r(['【onWorkerStart系統(tǒng)異?!? . date('Y-m-d H:i:s') . ' 文件:' . __FILE__ . ' 行號(hào):' . __LINE__, $throwable->getMessage()]);
sleep(2);
Worker::stopAll();
}
}
啟動(dòng)公共連接:
/**
* @return void
* @throws Exception
*/
private function startPublic(): void
{
[$http, $public, $private] = $this->service_url;
$connection = new AsyncTcpConnection($public, $this->getContextOption());
$connection->setLabel(AsyncTcpConnection::LABEL_PUBLIC);
$this->bindConnection($connection);
$connection->connect();
}
設(shè)置異步連接:
/**
* 設(shè)置異步連接
* @param AsyncTcpConnection $con
* @return void
*/
private function bindConnection(AsyncTcpConnection $con): void
{
$con->setStartTime(time());
$con->account_name = $this->websocket_api->getConfig()->title;
$con->api_key = $this->websocket_api->getConfig()->api_key;
// 設(shè)置以ssl加密方式訪問(wèn)
$con->transport = 'ssl';
$con->onError = function (AsyncTcpConnection $connection, $err_code, $err_msg) {
//echo "$err_code, $err_msg\n";
};
$con->onClose = function (AsyncTcpConnection $connection) {
};
$con->onConnect = function (AsyncTcpConnection $con) {
$con->send(json_encode($this->websocket_api->login()));
$con->send('ping');
};
$con->onMessage = function (AsyncTcpConnection $con, $data) {
if (!$data) {
return;
}
try {
switch ($data) {
case 'pong': //心跳回應(yīng)
$filename = 'pong_' . $con->getLabel() . '_' . $con->api_key . '.log';
$account_name = $this->websocket_api->getConfig()->title;
file_put_contents($this->logsPath() . $filename, $account_name . ' 最后收到pong:' . date('Y-m-d H:i:s'));
$con->setLastPongTime(time());
break;
default:
$event = json_decode($data, true);
if (isset($event['event'])) {
//事件
//WssMessageService::listenEvent($event, $data, $this->websocket_api, $con);
$handler = EventFactory::make($event);
$handler->handle($event, $this->websocket_api, $con);
} else if (isset($event['arg']['channel'])) {
//頻道
//WssMessageService::listenChannel($event, $data, $this->websocket_api, $con);
$handler = ChannelFactory::make($event);
$handler->handle($event, $this->websocket_api, $con);
} else if (isset($event['id']) && isset($event['op'])) {
//交易
//WssMessageService::listenOp($event, $data, $this->websocket_api, $con);
$handler = OperationFactory::make($event);
$handler->handle($event, $this->websocket_api, $con);
}
break;
}
} catch (Throwable $throwable) {
fileDebug([($con->isPublicChannel() ? '【公共頻道】' : '【私有頻道】') . date('Y-m-d H:i:s'), '【onMessage業(yè)務(wù)異?!?文件:' . __FILE__ . ' 行號(hào):' . __LINE__, $throwable->getMessage(), $data], 'logs', 'onMessage_Throwable');
}
};
}
斷線重連邏輯:
/**
* 檢查ping狀態(tài),超時(shí)斷線重連
* @param AsyncTcpConnection $con
* @return void
* @throws Exception
*/
private function checkStatus(AsyncTcpConnection $con): void
{
try {
$ttl = config('okx.ping_interval', 20);
$last_pong_time = $con->getLastPongTime();
$start_time = $con->getStartTime();
if (empty($last_pong_time) && ($start_time + $ttl > time())) {
//剛啟動(dòng),還沒(méi)收到pong
return;
}
//超時(shí)判斷
if (($last_pong_time + $ttl * 1.3) < time()) {
RobotNotify::send('ping狀態(tài)異常正在重連', 'checkStatus:' . $con->getLabel() . PHP_EOL . ($con->account_name ?? ''));
//重連
$con->close();
if ($con->isPrivateChannel()) {
$this->startPrivate();
} else {
$this->startPublic();
}
}
} catch (Throwable $throwable) {
//通知開(kāi)發(fā)者
RobotNotify::send('重連異常', 'reconnect:' . $con->getLabel() . PHP_EOL . ($con->account_name ?? '') . PHP_EOL . $throwable->getMessage());
Worker::stopAll();
}
}