<?php
use Workerman\Worker;
require_once './../Workerman/Autoloader.php';
// 初始化一個(gè)worker容器,監(jiān)聽1234端口
global $worker;
$worker = new Worker('tcp://0.0.0.0:1235');
// 這里進(jìn)程數(shù)必須設(shè)置為1
$worker->count = 1;
// 新增加一個(gè)屬性,用來保存設(shè)備id到connection的映射
$worker->deviceIdConnections = array();
// 當(dāng)有設(shè)備發(fā)來消息時(shí)執(zhí)行的回調(diào)函數(shù)
$worker->onMessage = function($connection, $data)use($worker)
{
// 判斷當(dāng)前設(shè)備是否已經(jīng)驗(yàn)證,既是否設(shè)置了device_id
if(!isset($connection->device_id))
{
// 沒驗(yàn)證的話把第一個(gè)包當(dāng)做device_id(這里為了方便演示,沒做真正的驗(yàn)證)
$device_id = $data;
$connection->device_id = $device_id;
/* 保存device_id到connection的映射,這樣可以方便的通過device_id查找connection,
* 實(shí)現(xiàn)針對(duì)特定device_id推送數(shù)據(jù)
*/
$old_connection = isset($worker->deviceIdConnections[$device_id]) ? $worker->deviceIdConnections[$device_id] : null;
$worker->deviceIdConnections[$device_id] = $connection;
if ($old_connection) {
$old_connection->close();
} else {
// ====設(shè)備上線====
echo "$device_id 上線\n";
}
return;
}
};
// 當(dāng)有設(shè)備連接斷開時(shí)
$worker->onClose = function($connection)use($worker)
{
global $worker;
if (!isset($connection->device_id) || !isset($worker->deviceIdConnections[$connection->device_id])) {
return;
}
$device_id = $connection->device_id;
$connection_online = $worker->deviceIdConnections[$device_id];
// 連接斷開時(shí)刪除映射
if ($connection->id == $connection_online->id) {
unset($worker->deviceIdConnections[$device_id]);
// === 設(shè)備下線 ===
echo "$device_id 下線\n";
}
};
// 啟動(dòng)
Worker::runAll();
workerman里利用onClose來監(jiān)控設(shè)備下線,例如上面的例子
<?php
namespace app\push\controller;
use think\Db;
use think\worker\Server;
use Workerman\Lib\Timer;
// 心跳間隔55秒
define('HEARTBEAT_TIME', 3);
class Worker extends Server
{
protected $socket = 'http://0.0.0.0:1235';
/**
* 收到信息
* @param $connection
* @param $data
*/
public function onMessage($connection, $data)
{
// dump($connection);
$post = $data['request'];
dump($post['Data']['DeviceInfo']['DeviceUUID']);
switch ($post['Name']) {
case 'heartbeatRequest':
$return = array(
'Name' => 'heartbeatResponse',
'TimeStamp' => time(),
"Code" => 1,
"Message" => ""
);
$uuid = $post['Data']['DeviceInfo']['DeviceUUID'];
$connection->worker->name = $uuid;
$this->updateDeviceStatus($uuid, 1);
return $return;
break;
case 'captureInfoRequest':
echo 1;
break;
case 'resultRequest':
echo 3;
break;
}
}
/**
* 當(dāng)連接建立時(shí)觸發(fā)的回調(diào)函數(shù)
* @param $connection
*/
public function onConnect($connection)
{
}
/**
* 當(dāng)連接斷開時(shí)觸發(fā)的回調(diào)函數(shù)
* @param $connection
*/
public function onClose($connection)
{
// dump($connection->worker);
$uuid = $connection->worker->name;
// dump($uuid);
$this->updateDeviceStatus($uuid, 0);
}
/**
* 當(dāng)客戶端的連接上發(fā)生錯(cuò)誤時(shí)觸發(fā)
* @param $connection
* @param $code
* @param $msg
*/
public function onError($connection, $code, $msg)
{
echo "error $code $msg\n";
}
/**
* 每個(gè)進(jìn)程啟動(dòng)
* @param $worker
*/
public function onWorkerStart($worker)
{
Timer::add(1, function () use ($worker) {
$time_now = time();
foreach ($worker->connections as $connection) {
echo "在線";
// 有可能該connection還沒收到過消息,則lastMessageTime設(shè)置為當(dāng)前時(shí)間
if (empty($connection->lastMessageTime)) {
$connection->lastMessageTime = $time_now;
continue;
}
// 上次通訊時(shí)間間隔大于心跳間隔,則認(rèn)為客戶端已經(jīng)下線,關(guān)閉連接
if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) {
echo "下線";
$connection->close();
}
}
});
}
/**
* note: 更改設(shè)備狀態(tài)
* user: 萬象初新
* date: 2020/7/28 10:04
*/
public function updateDeviceStatus($uuid, $status = 0)
{
return Db::name('julong_device')->where('uuid', $uuid)->update(array('is_online' => $status));
}
}