自定義的文本格式協(xié)議,格式很簡(jiǎn)單,用\r\n\r\n來(lái)分隔數(shù)據(jù)。代碼見(jiàn)最下方。
現(xiàn)在的情況是這樣子的,我通過(guò)tcp客戶(hù)端發(fā)起一次請(qǐng)求,傳輸5個(gè)文本包,服務(wù)端的日志來(lái)看,這5個(gè)包確實(shí)都接收到了(protocol的input中的日志被打印出來(lái)了),但是并沒(méi)有觸發(fā)onMessage的操作。
進(jìn)一步定位,protocol中的decode函數(shù)壓根都沒(méi)有被調(diào)用,下面輸出的內(nèi)容中可以看到,5個(gè)包都正常收到了,解析length也正確,就是沒(méi)有觸發(fā)decode操作。。。
++++++++++++++++++++++++++++ 輸出 +++++++++++++++++++++++++++
----------------------- WORKERMAN -----------------------------
Workerman version:3.3.1 PHP version:5.5.25
------------------------ WORKERS -------------------------------
user worker listen processes status
root workerman_logger orzText://0.0.0.0:8300 4
----------------------------------------------------------------
Press Ctrl-C to quit. Start success.
string(11) "length: 223"
string(9) "decode..."
string(12) "onMessage..."
string(11) "length: 223"
string(9) "decode..."
string(12) "onMessage..."
string(11) "length: 223"
string(11) "length: 223"
string(11) "length: 223"
++++++++++++++++++++++++++++ 代碼 +++++++++++++++++++++++++++
<?php
namespace Workerman\Protocols;
use Workerman\Connection\TcpConnection;
class OrzText
{
public static $SEPARATOR = "\r\n\r\n";
public static $SEPARATOR_LENGTH = 4;
public static function input($buffer, TcpConnection $connection)
{
if (strlen($buffer) >= TcpConnection::$maxPackageSize) {
$connection->close();
return 0;
}
$pos = strpos($buffer, self::$SEPARATOR);
if ($pos === false) {
return 0;
}
$length = $pos + self::$SEPARATOR_LENGTH;
var_dump("length: " . $length);
return $length;
}
public static function encode($buffer)
{
return $buffer . self::$SEPARATOR;
}
public static function decode($buffer)
{
var_dump("decode...");
return trim($buffer);
}
}
require_once(dirname(__FILE__) . '/CommandBaseManager.class.php');
Vendor('Workerman.Autoloader', '', '.php');
Vendor('GatewayWorker.Lib.Db', '', '.php');
use \Workerman\Worker;
use \GatewayWorker\Lib\Db;
abstract class WorkermanBaseManager extends CommandBaseManager
{
protected static $config = null;
protected static $dbs = array();
protected static $db = null;
public function _initialize()
{
parent::_initialize();
Db::set_config(0, array(
'host' => C('DB_HOST'),
'port' => C('DB_PORT'),
'user' => C('DB_USER'),
'password' => C('DB_PWD'),
'dbname' => C('DB_NAME'),
));
self::$dbs = Db::instance(0);
self::$db = self::$dbs;
}
public function onWorkerStart($task)
{
$this->_logger()->debug('name: ' . $task->name);
return true;
}
public function onConnect($connection)
{
$this->_logger()->debug('from ip ' . $connection->getRemoteIp());
return true;
}
public function onMessage($connection, $data)
{
static $i = 0;
var_dump("onMessage..." . (++$i));
}
public function onClose($connection)
{
$this->_logger()->debug('from ip ' . $connection->getRemoteIp());
return true;
}
protected function _work()
{
$config = array(
'listen' => 'OrzText://0.0.0.0:8300',
'name' => 'workerman_logger',
'count' => 4,
'logFile' => 'Log/Command/logger_run.log',
);
$worker = new Worker(self::$config);
Worker::$logFile = REAL_ROOT_PATH . self::$config;
$worker->count = self::$config;
$worker->name = self::$config;
$worker->onMessage = array($this, 'onMessage');
$worker->onConnect = array($this, 'onConnect');
$worker->onClose = array($this, 'onClose');
$worker->onWorkerStart = array($this, 'onWorkerStart');
$worker->onWorkStop = array($this, 'onWorkerStop');
$worker->onBufferFull = array($this, 'onBufferFull');
$worker->onBufferDrain = array($this, 'onBufferDrain');
$worker->onError = array($this, 'onError');
Worker::runAll();
}
// 一般不需要做修改的一些回調(diào)
public function onBufferFull($connection)
{
$this->_logger()->warn('do not send again');
}
public function onBufferDrain($connection)
{
$this->_logger()->warn('send again');
}
public function onWorkerStop($worker)
{
$log = sprintf('worker stop: %s', $worker->id, $worker->name);
$this->_logger()->debug($log);
}
public function onError($connection, $code, $msg)
{
$log = sprintf('from ip %s, error: ', $connection->getRemoteIp(), $code, $msg);
$this->_logger()->error($log);
}
}
<?php
$host = 'localhost';
$port = 8300;
$timeout = 1;
$eof = "\r\n\r\n";
$socket = fsockopen($host, $port, $errno, $errstr, $timeout);
for ($i = 0; $i < 20; $i++)
{
fwrite($socket, 'testing' . $eof);
}
fclose($socket);
echo "done\n";
在群里得到了各位的幫助,最后又多次調(diào)試代碼,發(fā)現(xiàn)了原因:客戶(hù)端連接之后發(fā)送數(shù)據(jù),在服務(wù)端的onMessage中我用了$connection->send(消息),而當(dāng)服務(wù)端send失敗,就把connection destroy掉了,即使這個(gè)時(shí)候服務(wù)端已經(jīng)收到了后面的數(shù)據(jù),也不會(huì)再做處理了。
是的,客戶(hù)端瞬間發(fā)送多個(gè)消息就立刻關(guān)閉了連接,你的代碼中服務(wù)端收到消息處理的同時(shí)向客戶(hù)端send消息,但是客戶(hù)端已經(jīng)關(guān)閉,workerman 在send時(shí)候發(fā)現(xiàn)客戶(hù)端連接關(guān)閉了,就釋放了連接,同時(shí)也釋放了連接緩沖區(qū)的數(shù)據(jù),導(dǎo)致后面的消息被丟棄。
實(shí)際上客戶(hù)端發(fā)送完數(shù)據(jù)應(yīng)該等待服務(wù)端確認(rèn)后才能關(guān)閉連接,不然數(shù)據(jù)無(wú)法確保能被服務(wù)端接收并處理。因?yàn)榘l(fā)送的數(shù)據(jù)可能在客戶(hù)端的socket緩沖區(qū),根本沒(méi)發(fā)送到服務(wù)端,也可能在服務(wù)端的緩沖區(qū),還沒(méi)被處理。
解決方法是客戶(hù)端發(fā)送完消息后要讀取服務(wù)端的返回,確定服務(wù)端收到數(shù)據(jù)再關(guān)閉連接。