我有一個(gè)場(chǎng)景,像是車場(chǎng)道閘那種
1.gateway-worker作為服務(wù)端,然后道閘系統(tǒng)是客戶端,與服務(wù)端建立tcp長(zhǎng)連接。(客戶端會(huì)向服務(wù)端發(fā)送心跳,5s/次)
2.用戶請(qǐng)求服務(wù)端,獲取道閘上的金額,進(jìn)行支付
假設(shè)道閘客戶端連接到gateway-worker的client_id是001,車場(chǎng)的客戶端編號(hào)是A,那在道閘連接上服務(wù)端的時(shí)候,會(huì)進(jìn)行client_id 001和編號(hào)A的綁定Gateway::bindUid(001, A)
。那后續(xù)有用戶發(fā)起http請(qǐng)求獲取金額的時(shí)候,請(qǐng)求中也會(huì)傳遞車場(chǎng)編號(hào)A,那服務(wù)端會(huì)向客戶端A發(fā)送tcp請(qǐng)求獲取金額。
現(xiàn)在有一種情況,就是道閘系統(tǒng)有時(shí)候網(wǎng)絡(luò)不好,老是斷網(wǎng),斷網(wǎng)情況下,道閘客戶端就沒辦法向服務(wù)端及時(shí)發(fā)送心跳包,服務(wù)端也不知道這個(gè)客戶端A離線了,這時(shí)候如果有http請(qǐng)求進(jìn)來,服務(wù)端會(huì)繼續(xù)向客戶端A發(fā)送請(qǐng)求獲取金額,這時(shí)候就會(huì)導(dǎo)致進(jìn)程阻塞,導(dǎo)致http請(qǐng)求一直在請(qǐng)求中,直接影響到后續(xù)的http請(qǐng)求也進(jìn)不來
想問下,這種情況應(yīng)該如何處理呢?
1、在向客戶端A發(fā)送請(qǐng)求前,檢測(cè)客戶端A是否離線
使用Gateway::isOnline(string $client_id)
先檢測(cè)客戶端A狀態(tài)。但是這種方法建立在client_id觸發(fā)了onClose回調(diào),像斷網(wǎng)這種情況,客戶端是沒辦法觸發(fā)onClose回調(diào)
2、服務(wù)端向客戶端發(fā)送心跳包檢測(cè)客戶端狀態(tài)
我看官方文檔可以通過服務(wù)端向客戶端發(fā)送心跳包檢測(cè)客戶端狀態(tài),假設(shè)我設(shè)置心跳包3s/次,但是這種也會(huì)存在心跳包剛檢測(cè)客戶端正常,過了1s客戶端因?yàn)閿嗑W(wǎng)離線了,這時(shí)候有http請(qǐng)求進(jìn)來,服務(wù)端還不知道客戶端連接不上了,還是會(huì)出現(xiàn)上面我所說的情況
想問下大家有更合適的方案嗎?
安裝channel
composer require workerman/channel
start.php
<?php
use Workerman\Worker;
define('GLOBAL_START', 1);
require_once __DIR__ . '/vendor/autoload.php';
// 加載所有Applications/*/start.php,以便啟動(dòng)所有服務(wù)
foreach(glob(__DIR__.'/Applications/*/start*.php') as $start_file)
{
require_once $start_file;
}
// channel服務(wù)用來多進(jìn)程或者跨服務(wù)器通訊
$channel_server = new Channel\Server('0.0.0.0', 2206);
// 保存設(shè)備到connection的映射
global $connection_maps;
$connection_maps = [];
$http_worker = new Worker('http://0.0.0.0:1234');
$http_worker->onWorkerStart = function () {
// Channel客戶端連接到Channel服務(wù)端
Channel\Client::connect('127.0.0.1', 2206);
Channel\Client::on('get_amount_result', function($data) {
global $connection_maps;
$device_id = $data['device_id'];
foreach ($connection_maps[$device_id]??[] as $connection) {
$connection->close($data['amount']);
}
});
};
$http_worker->onMessage = function (\Workerman\Connection\TcpConnection $connection, \Workerman\Protocols\Http\Request $request) {
global $connection_maps;
$device_id = $request->get('device_id');
if (!$device_id) {
$connection->send('not found');
return;
}
$connection->device_id = $device_id;
$connection_maps[$device_id][$connection->id] = $connection;
// 通過channel向gatewayWorker咨詢客戶端金額
Channel\Client::publish('get_amount', [
'device_id' => $device_id
]);
};
$http_worker->onClose = function (\Workerman\Connection\TcpConnection $connection) {
if (empty($connection->device_id)) {
return;
}
// 刪除$connection_maps對(duì)應(yīng)的連接,避免內(nèi)存泄漏
$device_id = $connection->device_id;
global $connection_maps;
unset($connection_maps[$device_id][$connection->id]);
if (empty($connection_maps[$device_id])) {
unset($connection_maps[$device_id]);
}
};
// 運(yùn)行所有服務(wù)
Worker::runAll();
Events.php
<?php
use \GatewayWorker\Lib\Gateway;
class Events
{
public static function onWorkerStart($worker)
{
// Channel客戶端連接到Channel服務(wù)端
Channel\Client::connect('127.0.0.1', 2206);
// 只需要在0號(hào)進(jìn)程上開啟get_amount監(jiān)聽
if ($worker->id !== 0) {
Channel\Client::on('get_amount', function($data) {
$device_id = $data['device_id'];
Gateway::sendToUid($device_id, 'get_amount');
});
}
}
public static function onMessage($client_id, $message)
{
// 忽略客戶端心跳
if ($message === '{"type":"ping"}') {
return;
}
// 假設(shè)客戶端發(fā)的第一個(gè)消息當(dāng)作device_id
if (empty($_SESSION['device_id'])) {
$device_id = trim($message);
Gateway::bindUid($client_id, $device_id);
$_SESSION['device_id'] = $device_id;
return;
}
// 客戶端后續(xù)發(fā)的消息當(dāng)作金額
$amount = $message;
// 通知http進(jìn)程得到device_id的金額
Channel\Client::publish('get_amount_result', [
'device_id' => $_SESSION['device_id'],
'amount' => $amount
]);
}
}
php start.php start
瀏覽器打開頁(yè)面 http://127.0.0.1:1234/?device_id=d2
開啟一個(gè)終端,輸入
telnet 127.0.0.1 8282
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
d2
800
提示
d2代表設(shè)備id
800代表金額
瀏覽器顯示 800
啟動(dòng)一個(gè)channel server作為多進(jìn)程或者跨服務(wù)器通訊組件
businessWorker的Events里onWorkerStart連接channelserver并監(jiān)聽http進(jìn)程發(fā)來的get_amount請(qǐng)求(請(qǐng)求中包含設(shè)備id)
啟動(dòng)一個(gè)httpworker作為http接口,onWorkerStart里連接channelserver并監(jiān)聽get_amount_result結(jié)果(結(jié)果中包含設(shè)備id)
瀏覽器向httpworker發(fā)起請(qǐng)求,httpworker獲得要查詢的設(shè)備id,并將連接保存到connection_maps中,然后通過channel發(fā)布一個(gè)get_amount事件給businessWorke的Events.php
設(shè)備返回金額后在Events里的onMessage里通過channel發(fā)送get_amount_result事件通知http進(jìn)程對(duì)應(yīng)的設(shè)備返回了金額
httpworker獲得金額后查找本地連接里($connection_maps)是否有查詢對(duì)應(yīng)設(shè)備金額的連接,有的話返回金額