直接start運(yùn)行,大約45秒(累計(jì)2W5+數(shù)據(jù)后)后拋出異常
Warning: stream_socket_client(): unable to connect to tcp://172.18.0.19:22345 (Address not available) in /var/www/html/vendor/workerman/workerman/Connection/AsyncTcpConnection.php on line 185
MQTT內(nèi)數(shù)據(jù)為
{"code":"LI2701_LS_HH","type":"BOOL","value":false,"timestamp":"1617874140"}
服務(wù)端代碼1
require_once __DIR__ . '/../../vendor/autoload.php';
use \Workerman\Worker;
use \Workerman\Connection\AsyncTcpConnection;
$worker = new Worker();
$worker->onWorkerStart = function(){
global $db;
$db = 0;
$mqtt = new \Workerman\Mqtt\Client('mqtt://127.0.0.1:1883');
$mqtt->onConnect = function($mqtt) {
$mqtt->subscribe('simp/nodes/#');
};
$mqtt->onMessage = function($topic, $content){
global $db;
// 與遠(yuǎn)程task服務(wù)建立異步連接,ip為遠(yuǎn)程task服務(wù)的ip,如果是本機(jī)就是127.0.0.1,如果是集群就是lvs的ip
$task_connection = new AsyncTcpConnection('Text://127.0.0.1:22345');
// 任務(wù)及參數(shù)數(shù)據(jù)
// $task_data = array(
// 'function' => 'send_mail',
// 'args' => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
// );
// 發(fā)送數(shù)據(jù)
$topic = explode("/", $topic);
$data = json_decode($content,true);
$data["count"] = $db;
$db++;
$task_connection->send(json_encode($data));
// 異步獲得結(jié)果
$task_connection->onMessage = function($task_connection, $task_result)
{
// 結(jié)果
// 獲得結(jié)果后記得關(guān)閉異步連接
$task_connection->close();
// 通知對(duì)應(yīng)的websocket客戶端任務(wù)完成
};
// 執(zhí)行異步連接
$task_connection->connect();
};
$mqtt->connect();
};
服務(wù)端代碼2
require_once __DIR__ . '/../../vendor/autoload.php';
use Workerman\Worker;
// task worker,使用Text協(xié)議
$task_worker = new Worker('Text://0.0.0.0:22345');
// task進(jìn)程數(shù)可以根據(jù)需要多開一些
$task_worker->count = 100;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function($connection, $task_data)
{
// 假設(shè)發(fā)來(lái)的是json數(shù)據(jù)
$data = json_decode($task_data, true);
// 根據(jù)task_data處理相應(yīng)的任務(wù)邏輯.... 得到結(jié)果,這里省略....
echo $data["count"]."\n";
$task_result = "ok";
// 發(fā)送結(jié)果
$connection->send($task_result);
};
異常時(shí)執(zhí)行status