錯誤提示:
error package. package_length=1195725856
使用workerman與TP5.0的整合方法,分別建立了webscoket服務與Channel的服務端、http服務,啟動客戶端連接。
兩個問題:
1、Channel客戶端與服務端連接失?。?br />
2、error package. package_length=1195725856。
原計劃嘗試將這個功能嵌入到tp5.0里面
https://www.kancloud.cn/walkor/workerman/315202
個人修改后代碼,(剛入門菜鳥,輕噴。)
貼上原代碼:
tp5.0 :文件路徑1:vendor/topthink/think-worker/src Worker.php
namespace think\worker;
use Workerman\Worker;
use Channel\Client;
use Channel\Server as Channelserver;
/**
* Worker控制器擴展類
* Channel相關文件:Client.php和.Server.php,安裝在tp根目錄:extend/Channel下
*/
abstract class Server
{
/**
* -----------------------------------------------------------------------------------------------------------
*
*/
protected $worker;
protected $socket = '';
protected $protocol = 'http';
protected $host = '0.0.0.0';
protected $port = '2346';
protected $processes = 4; //進程個數(shù)
protected $name = "ApiWebSocket"; //進程名稱
protected $daemonize = false; //表示是否以daemon(守護進程)方式運行
protected $stdoutFile = LOG_PATH."workman_putout.log"; //所有的打印輸出全部保存在日志文件workman_putout.log文件中
protected $logFile = LOG_PATH."workman_start_stop.log"; //此文件記錄了workerman自身相關的日志,包括啟動、停止等
//Channel服務擴展
protected $channelServer = false; //是否開啟Channel服務,默認false,不開啟服務
protected $channelHost = '0.0.0.0'; //是否開啟Channel服務,默認false,不開啟服務
protected $channelPort = 2206; //是否開啟Channel服務,默認false,不開啟服務
/**
* 架構函數(shù)
* @access public
*/
public function __construct()
{
//是否啟動Channel服務,默認不開啟
if($this->channelServer){
$channel_server = new Channelserver($this->channelHost, $this->channelPort);
}
// 實例化 Websocket 服務
$this->worker = new Worker($this->socket ?: $this->protocol . '://' . $this->host . ':' . $this->port);
// 設置進程數(shù)
$this->worker->count = $this->processes;
$this->worker->name = $this->name;
// 初始化
$this->init();
// 設置回調
foreach ( as $event) {
if (method_exists($this, $event)) {
$this->worker->$event = ;
}
}
// Run worker
Worker::runAll();
}
protected function init()
{
//輸出日志文件
if (!file_exists($this->stdoutFile)) { // 如果不存在則創(chuàng)建
file_put_contents($this->stdoutFile, '');
// 檢測是否有權限操作
if (!is_writeable($this->stdoutFile)) chmod($this->stdoutFile, 0777); // 如果無權限,則修改為0777最大權限
};
//開啟、關閉日志文件
if (!file_exists($this->logFile)) { // 如果不存在則創(chuàng)建
file_put_contents($this->logFile, '');
// 檢測是否有權限操作
if (!is_writeable($this->logFile)) chmod($this->logFile, 0777); // 如果無權限,則修改為0777最大權限
};
Worker::$daemonize = $this->daemonize;
Worker::$stdoutFile = $this->stdoutFile;
Worker::$logFile = $this->logFile;
}
}
文件2:websocket 服務端 啟動Channel服務端 路徑:application/push/controller/Worker.php
<?php
namespace app\push\controller;
use think\worker\Server;
use Workerman\Lib\Timer;
use Channel\Client;
use Channel\Server as Channelserver;
class Worker extends Server
{
protected $socket = 'websocket://api.3-gd.com:2886';
protected $processes = 2;
protected $name = "pusher"; //進程名稱
protected $channelServer = true; //是否開啟Channel服務,默認false,不開啟服務
protected $channelHost = 'api.3-gd.com'; //是否開啟Channel服務,默認false,不開啟服務
protected $channelPort = 2207; //是否開啟Channel服務,默認false,不開啟服務
/**
* 數(shù)據(jù)傳輸內容格式要求
* a、必須包含用戶ID、用戶信息、請求路徑或請求路徑、請求或傳輸?shù)膮?shù)、返回數(shù)據(jù)的處理方法
* b、參數(shù)全部為json字符串,接收后再處理為數(shù)組對象或json對象
* 接收數(shù)據(jù)后,此頁面僅做分發(fā)和接收處理,其余數(shù)據(jù)由對應的模塊功能進行處理
*
* 返回數(shù)據(jù)內容
* "send_msg" :返回的提示主要信息
* "send_desc":返回的主要描述信息
* "type" :返回的數(shù)據(jù)處理方式,如"message":一般提示信息、"alarm":重要提示信息、"data":接口返回數(shù)據(jù)
* "fun" :建議的數(shù)據(jù)處理方法及函數(shù)
*
*/
/**
* 收到信息
* @param $connection
* @param $data
*/
public function onMessage($connection, $data)
{
// 給connection臨時設置一個lastMessageTime屬性,用來記錄上次收到消息的時間
$connection->lastMessageTime = time();
// 處理接收到的數(shù)據(jù)
$input_info = json_decode( $data, true );
// 判斷接收的數(shù)據(jù)是否為合法數(shù)據(jù)
if(!$input_info||!$input_info||!$input_info||!$input_info||!$input_info){
$arr = ;
$connection->send( json_encode($arr) );
}
// 將判斷后的數(shù)據(jù)交給對應的控制器去處理
else{
//選擇控制器并獲取操作結果
$user_id = $input_info;
$user_info = $input_info;
$user_mvc = $input_info;
$user_data = $input_info;
$user_fun = $input_info;
//將數(shù)據(jù)交給其他模型處理
$do_action = model('push/PushComm');
$res = $do_action->sendDataToMVC($user_id,$user_info,$user_mvc,$user_data);
$arr=;
//將結果返回給客戶端
$connection->send( json_encode($arr) );
}
}
/**
* 當連接建立時觸發(fā)的回調函數(shù)
* @param $connection
*/
public function onConnect($connection)
{
}
/**
* 當連接斷開時觸發(fā)的回調函數(shù)
* @param $connection
*/
public function onClose($connection)
{
}
/**
* 當客戶端的連接上發(fā)生錯誤時觸發(fā)
* @param $connection
* @param $code
* @param $msg
*/
public function onError($connection, $code, $msg)
{
echo "error $code $msg\n";
}
/**
* 每個進程啟動
* @param $worker
*/
public function onWorkerStart($worker)
{
// 心跳間隔60秒
define('HEARTBEAT_TIME', 60);
// 鏈接保持提醒10秒
define('ALARM_TIME', 10);
Timer::add(1, function()use($worker){
$time_now = time();
foreach($worker->connections as $connection) {
// 有可能該connection還沒收到過消息,則lastMessageTime設置為當前時間
if (empty($connection->lastMessageTime)) {
$connection->lastMessageTime = $time_now;
continue;
}
//$connection->send(time());
// 鏈接保持提醒時間
if ($time_now - $connection->lastMessageTime == HEARTBEAT_TIME - ALARM_TIME) {
//發(fā)送命令消息,提示應該重新發(fā)送消息,保持鏈接
$arr = ;
$connection->send( json_encode($arr) );
continue;
}
// 上次通訊時間間隔大于心跳間隔,則認為客戶端已經下線,關閉連接
if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) {
$connection->close();
}
}
});
//連接Channel服務
// Channel客戶端連接到Channel服務端
Client::connect( $this->channelHost, $this->channelPort);
// 以自己的進程id為事件名稱
$event_name = $worker->id;
// 訂閱worker->id事件并注冊事件處理函數(shù)
Client::on($event_name, function($event_data)use($worker){
$to_connection_id = $event_data;
$message = $event_data;
if(!isset($worker->connections))
{
echo "connection not exists\n";
return;
}
$to_connection = $worker->connections;
$to_connection->send($message);
});
// 訂閱廣播事件
$event_name = '廣播';
// 收到廣播事件后向當前進程內所有客戶端連接發(fā)送廣播數(shù)據(jù)
Client::on($event_name, function($event_data)use($worker){
$message = $event_data;
foreach($worker->connections as $connection)
{
$connection->send($message);
}
});
}
}
文件3:http 服務端 啟動Channel客戶端 路徑:application/push/controller/Workerserver.php
<?php
namespace app\push\controller;
use think\worker\Server;
use Workerman\Lib\Timer;
use Channel\Client;
use Channel\Server as Channelserver;
/**
* 基于Worker的多進程(分布式集群)推送系統(tǒng)
* 可以實現(xiàn)服務端后臺任務向用戶推送數(shù)據(jù)
* 后臺用戶定期執(zhí)行某操作等
* 實現(xiàn)雙向通訊
* -------------------------------特別說明---------------------------------
* Channel相關文件:Client.php和.Server.php,安裝在tp根目錄:extend/Channel下
*
* 用來處理http請求,向任意客戶端推送數(shù)據(jù),需要傳workerID和connectionID
*
* 依賴WorkMan
*/
class Workerserver extends Server
{
protected $socket = 'http://api.3-gd.com:2887';
protected $processes = 1;
protected $name = "publisher"; //進程名稱
protected $channelServer = false; //是否開啟Channel服務,默認false,不開啟服務
protected $channelHost = 'api.3-gd.com'; //是否開啟Channel服務,默認false,不開啟服務
protected $channelPort = 2207; //是否開啟Channel服務,默認false,不開啟服務
// 用來處理http請求,向任意客戶端推送數(shù)據(jù),需要傳workerID和connectionID
/**
* 每個進程啟動
* @param $worker
*/
public function onWorkerStart($worker)
{
//連接Channel服務
// Channel客戶端連接到Channel服務端
Client::connect( $this->channelHost, $this->channelPort);
}
/**
* 收到信息
* @param $connection
* @param $data
*/
public function onMessage($connection, $data)
{
$connection->send('ok');
if(empty($_GET)) return;
// 是向某個worker進程中某個連接推送數(shù)據(jù)
if(isset($_GET) && isset($_GET))
{
$event_name = $_GET;
$to_connection_id = $_GET;
$content = $_GET;
Client::publish($event_name, array(
'to_connection_id' => $to_connection_id,
'content' => $content
));
}
// 是全局廣播數(shù)據(jù)
else
{
$event_name = '廣播';
$content = $_GET;
Client::publish($event_name, array(
'content' => $content
));
}
}
/**
* 當客戶端的連接上發(fā)生錯誤時觸發(fā)
* @param $connection
* @param $code
* @param $msg
*/
public function onError($connection, $code, $msg)
{
echo "error $code $msg\n";
}
}
求各位大神指點一下,謝謝