使用GatewayWorker同時做客戶端和服務(wù)端。
做客戶端使用異步請求一個ws連接,不斷接受消息,將其發(fā)送到Channel通道中。
做服務(wù)端訂閱Channel通道,將消息轉(zhuǎn)發(fā)給普通用戶連接。
我的問題不是如何實現(xiàn),而是如何在服務(wù)器配置有限的情況下,盡最大可能提升并發(fā)量。
服務(wù)器配置是4核4G高性能云服務(wù)器,目前不使用階梯式遞增壓測,最高并發(fā)可維持在1w左右。
超過1w,就會exit_process進程,殺掉一些連接。
我想知道的是,請問大家還有什么方案來實現(xiàn)嗎?
我目前還沒有加上數(shù)據(jù)處理部分,我想加上數(shù)據(jù)處理部分并發(fā)又會下降不少。
需要實時轉(zhuǎn)發(fā)的數(shù)據(jù)處理部分是否可以通過task來實現(xiàn)?
使用協(xié)程可以來處理不是需要實時轉(zhuǎn)發(fā)的數(shù)據(jù),比如把接收到的數(shù)據(jù)存儲到數(shù)據(jù)庫這些阻塞操作?
開啟4個Gateway進程,12個Business進程,1個Channel進程,1個Register進程。
目前有四個方案:
第一個是單一訂閱頻道&直接sendToAll消息。
第二個是在onConnect中加入一個連接分組策略,多個訂閱頻道&sendToGroup消息。
第三個是在onWorkerStart加入一個進程分組策略,即異步連接只產(chǎn)生在一個進程A中,通過心跳來判斷進程A是否一直正常處理,不正常時打開第二個進程B來處理消息,如此反復(fù)。其他同第二個方案一樣。
第四個是把客戶端和服務(wù)端進行分布式部署。異步請求以及發(fā)送消息給通道的操作 與 用戶連接訂閱通道消息并發(fā)送消息給用戶 完全分開部署。
第一個方案代碼如下:
public static function onWorkerStart($businessWorker)
{
// 異步建立連接
if ($businessWorker->id === 0) {
// 連接通道
Channel\Client::connect('0.0.0.0', 2206);
// 創(chuàng)建異步連接
$connection = new AsyncTcpConnection('ws://101.82.23.49:9765');
// 設(shè)置ssl
$connection->transport = 'ssl';
// 設(shè)置成功連接時
$connection->onConnect = function ($con) {
// 發(fā)送消息
$con->send('{"op": "subscribe","args":[]}');
};
// 設(shè)置成功接收到消息時
$connection->onMessage = function ($connection, $data) {
// 轉(zhuǎn)發(fā)消息給通道接受
Channel\Client::publish('data_channel', $data);
};
// 執(zhí)行異步連接
$connection->connect()
}
// 訂閱心跳信號
Channel\Client::on('data_channel', function($data) {
Gateway::sendToAll($data);
});
}
第二個方案代碼如下:
use \Workerman\Redis\Connection as RedisConnection;
public static function onConnect($client_id)
{
$redis = new RedisConnection('localhost', 6379);
$group_ans = $redis->get('group_ans') ?: 0;
$increment = $redis->get('increment') ?: 1;
Label:
if ($group_ans > self::MAX_GROUP - 1) {
// group
$group_ans = 0;
// 增量
$increment++;
$redis->set('increment', $increment);
}
if (Gateway::getClientCountByGroup('group_' . $group_ans) > (int)(self::MIN_USERS * $increment)) {
// group
$group_ans++;
$redis->set('group_ans', $group_ans);
// callback
goto Label;
}
Gateway::joinGroup($client_id, 'group_' . $group_ans);
}
第三個方案代碼如下:
use \Workerman\Redis\Connection as RedisConnection;
public static function onWorkerStart($businessWorker)
{
// 連接Redis
self::$redis = new Redis();
self::$redis->connect('localhost', 6379);
// 連接通道
Channel\Client::connect('0.0.0.0', 2206);
// 如果是0號進程,作為主進程
if ($businessWorker->id === 0) {
// 獲取數(shù)據(jù)的代碼...
self::getData($businessWorker);
}
// 如果是備份進程
if ($businessWorker->id >= 1 && $businessWorker->id <= self::MAX_GROUP - 1) {
// 設(shè)置
self::$last_heartbeat_time[$businessWorker->id] = time();
// 訂閱心跳信號
Channel\Client::on('heart_channel', function($data) use ($businessWorker) {
self::$last_heartbeat_time[$businessWorker->id] = time();
});
// 定期檢查心跳信號
Timer::add(3, function() use ($businessWorker) {
// 如果1秒鐘沒有收到上一個進程的心跳信號,并且成功獲取了鎖
if (time() - self::$last_heartbeat_time[$businessWorker->id] > 1 && self::$redis->set('lock', 1, ['nx', 'ex' => 3])) {
// 開始獲取數(shù)據(jù)的代碼...
self::getData($businessWorker);
}
});
}
// 訂閱通道數(shù)據(jù)
Channel\Client::on('data_channel', function($data) use ($businessWorker) {
// 發(fā)送
Gateway::sendToGroup('group_' . $businessWorker->id, $data);
});
}
需要更改系統(tǒng)策略,比如超過一定用戶鏈接數(shù)之后,使用讀取緩存的方式獲取消息,例如:使用redis緩存一下積累的消息,使用http方式獲取一下,當然還有其他方式,如果只是單純的去轉(zhuǎn)發(fā)消息,很容易引發(fā)消息風(fēng)暴,導(dǎo)致帶寬跑滿