Workerman version:4.1.15
PHP version:7.3
windows server 環(huán)境
connectToRemote 后運行一段時間(幾小時~2天)就無法接收發(fā)送消息,會出現(xiàn): sendBufferToWorker fail. The connections between Gateway and BusinessWorker are not ready.
沒有connectToRemote不會有問題,請問這是什么問題,有什么解決辦法
namespace App\Http\Socket;
use App\Http\Controllers\PtTestController;
use Exception;
use GatewayWorker\Lib\Gateway;
use Illuminate\Http\Request;
use Workerman\Connection\AsyncTcpConnection;
use Workerman\Lib\Timer;
class Events
{
private static $connection = null;
private static $retryTimer = null;
private static $remoteAddress = null;
private static $externalID = [];
public static function onWebSocketConnect($client_id, $data)
{
if (isset($data['get']['client'])) {
$ids = Gateway::getClientIdByUid($data['get']['client']);
if (!empty($ids)) {
foreach ($ids as $id) {
if (Gateway::isOnline($id)) {
Gateway::closeClient($id);
}
}
}
Gateway::bindUid($client_id, $data['get']['client']);
}
}
public static function onMessage(string $client_id, $message)
{
try {
$msg = json_decode($message);
$op = $msg->op ?? false;
$data = $msg->data ?? null;
$deviceID = $data->deviceID ?? null;
$type = $data->type ?? null;
$idx = $data->idx ?? null;
switch ($op) {
case 'Login':
if ($type === 'app_server') {
Gateway::bindUid($client_id, 'app_server');
// 鏈接至其他websocket
self::$remoteAddress = config('app.extranet_websocket');
if (self::$remoteAddress) {
self::connectToRemote();
}
}
break;
case 'ScanQR':
if ($idx) {
self::sendMessage($idx, $message);
}
break;
case 'AppMsg':
if ($deviceID) {
self::sendMessage($deviceID, $message);
}
break;
default:
if (Gateway::getUidByClientId($client_id) === 'app_server') {
if ($deviceID) {
self::sendMessage($deviceID, $message);
}
} else {
Gateway::sendToUid('app_server', $message);
}
break;
}
} catch (Exception $e) {
echo "Error in onMessage: " . $e->getMessage() . "\n";
}
}
public static function connectToRemote()
{
try {
self::$connection = null;
self::$connection = new AsyncTcpConnection(self::$remoteAddress);
} catch (Exception $e) {
echo 'Async tcp connection error: ' . $e->getMessage() . "\n";
}
self::$connection->onConnect = function($connection) {
try {
self::$connection->send(json_encode(['op' => 'Login', 'data' => ['type' => 'intranet_socket']]));
if (self::$retryTimer) {
Timer::del(self::$retryTimer);
self::$retryTimer = null;
}
} catch (Exception $e) {
echo 'Error during connection send: ' . $e->getMessage() . "\n";
}
};
self::$connection->onMessage = function($connection, $data) {
try {
$msg = json_decode($data, true);
$op = $msg['op'] ?? false;
$type = $msg['type'] ?? null;
if ($op === 'ExternalLogin') {
if (isset($msg['clientID']) && !in_array($msg['clientID'], self::$externalID)) {
self::$externalID[] = $msg['clientID'];
}
} else if ($op === 'ExternalLogout') {
if (isset($msg['clientID'])) {
$key = array_search($msg['clientID'], self::$externalID);
if ($key !== false) {
unset(self::$externalID[$key]);
self::$externalID = array_values(self::$externalID);
}
}
} else if ($type === 'to_controller') {
try {
$request = new Request($msg['data']);
$controller = new PtTestController();
if (method_exists($controller, $op)) {
$res = $controller->$op($request);
$message = json_encode([
'op' => $msg['op'],
'type' => 'to_controller',
'data' => $msg['data'],
'msg' => $res['msg'] ?? $res,
'code' => $res['code'] ?? 0,
]);
self::isConnectedSend($message);
}
} catch (Exception $e) {
$message = json_encode([
'op' => $msg['op'],
'type' => 'to_controller',
'data' => $msg['data'],
'msg' => $e->getMessage()
]);
self::isConnectedSend($message);
}
} else {
Gateway::sendToUid('app_server', $data);
}
} catch (Exception $e) {
echo 'Error in onMessage (remote connection): ' . $e->getMessage() . "\n";
}
};
self::$connection->onClose = function() {
if (self::$externalID) {
self::$externalID = [];
}
self::reconnect();
};
self::$connection->onError = function($connection, $code, $msg) {
if (self::$externalID) {
self::$externalID = [];
}
self::reconnect();
};
self::$connection->connect();
}
public static function reconnect()
{
if (self::$retryTimer) {
return;
}
self::$retryTimer = Timer::add(30, function() {
self::connectToRemote();
});
}
public static function sendMessage($recipientID, $message)
{
try {
$isExternal = in_array($recipientID, self::$externalID);
if ($isExternal) {
self::isConnectedSend($message);
} else {
Gateway::sendToUid($recipientID, $message);
}
} catch (Exception $e) {
echo "Error in sendMessage: " . $e->getMessage() . "\n";
}
}
public static function isConnectedSend($message)
{
try {
if (self::$connection) {
self::$connection->send($message);
} else {
self::reconnect();
}
} catch (Exception $e) {
echo "Error in sendMessage: " . $e->getMessage() . "\n";
}
}
}
sendBufferToWorker fail. The connections between Gateway and BusinessWorker are not ready.
Workerman version:4.1.15
PHP version:7.3
windows server 環(huán)境
connectToRemote 里不斷創(chuàng)建連接又不關(guān)閉,導(dǎo)致資源達(dá)到上限了
public static function connectToRemote()
{
// 如果連接已存在,創(chuàng)建新連接之前關(guān)閉它
if (self::$connection !== null) {
self::$connection->close();
self::$connection = null;
}
// 創(chuàng)建到遠(yuǎn)程服務(wù)器的異步TCP連接
self::$connection = new AsyncTcpConnection(self::$remoteAddress);
...
請問是這樣解決嗎
self::$connection->onError = function($connection, $code, $msg) {
echo "Error: $msg\n";
};
在請教下,這個出錯時的回調(diào)里面什么都不寫,會有問題嗎?會出現(xiàn): sendBufferToWorker fail. The connections between Gateway and BusinessWorker are not ready. 這個錯誤嗎?
因為在$connection->onError里之前是有錯誤重連,導(dǎo)致網(wǎng)絡(luò)不好經(jīng)常出現(xiàn) Error: send buffer full and drop package 就會重新鏈接
多謝大佬的幫助,現(xiàn)在有個問題就是通過self::$connection->send($message) 發(fā)送老是send buffer full and drop package,這個問題怎么解決呢,辛苦大佬幫忙看下,ai的回答也解決不了
public static function sendToRemote($message)
{
try {
self::$messageQueue[] = $message;
if (!self::$isSending) {
self::processQueue();
}
} catch (Exception $e) {
echo "Error in sendMessage: " . $e->getMessage() . "\n";
}
}
private static function processQueue()
{
if (empty(self::$messageQueue) || self::$isSending) {
return;
}
self::$isSending = true;
$message = array_shift(self::$messageQueue);
try {
if (self::$connection) {
self::$connection->send($message);
Timer::add(0.01, function () {
self::$isSending = false;
self::processQueue();
}, [], false);
} else {
self::reconnect();
self::$isSending = false;
}
} catch (Exception $e) {
echo "Error in processQueue: " . $e->getMessage() . "\n";
self::$isSending = false;
}
}