指定客戶(hù)端發(fā)送到客戶(hù)端 出現(xiàn)錯(cuò)誤 數(shù)據(jù)不能發(fā)送到指定客戶(hù)端
<?php
use Workerman\Worker;
use Workerman\Connection\AsyncTcpConnection;
use Workerman\Protocols\Websocket;
require_once '/mnt/d/linux/Autoloader.php';
// websocket服務(wù)
$worker = new Worker ( 'websocket://0.0.0.0:8882' );
$worker->onWorkerStart = function($worker){
};
$worker->count = 1;
// 新增加一個(gè)屬性,用來(lái)保存uid到connection的映射(uid是用戶(hù)id或者客戶(hù)端唯一標(biāo)識(shí))
$worker->uidConnections = array();
$worker->lastuid = 999999999;
$worker->onClose = function($connection)
{
global $worker;
if(isset($connection->uid))
{
// 連接斷開(kāi)時(shí)刪除映射
unset($worker->uidConnections);
}
};
$worker->onMessage = function ($ws_connection, $message){
global $worker;
$msg = json_decode ( $message, true );
$mi_id=$msg;
if($mi_id==true){
if(!isset($ws_connection->uid))
{
// 沒(méi)驗(yàn)證的話把第一個(gè)包當(dāng)做uid(這里為了方便演示,沒(méi)做真正的驗(yàn)證)
$ws_connection->uid = $mi_id;
/* 保存uid到connection的映射,這樣可以方便的通過(guò)uid查找connection,
* 實(shí)現(xiàn)針對(duì)特定uid推送數(shù)據(jù)
*/
//echo "uid=".$ws_connection->uid."\r\n";
$worker->uidConnections = $ws_connection;
//print_r($worker->uidConnections);
//return $ws_connection->send($data);
}else{
echo "888";
}
}
//print_r($msg);
// 與遠(yuǎn)程task服務(wù)建立異步鏈接,ip為遠(yuǎn)程task服務(wù)的ip,如果是本機(jī)就是127.0.0.1,如果是集群就是lvs的ip
/* if(!$task_connection){
$task_connection = new AsyncTcpConnection ( 'tcp://101.201.104.58:6668' );
} */
global $task_connection ;
if(!$task_connection){
$task_connection = new AsyncTcpConnection ( 'tcp://127.0.0.1:6668' );
}
//echo "<pre>";
//print_r($task_connection);
$task_connection->websocketType = Websocket::BINARY_TYPE_ARRAYBUFFER;
if($msg=="stop"){
$md_id=$msg;
// 停止
$arr = "{"
. "\n\t\"type\":\"stop\"," .
"\n\t\"md_id\":\"$md_id\"" .
"\n}";
// 構(gòu)造包體
$bin_body = $arr;
// 包體長(zhǎng)度
$body_len = strlen ( $bin_body );
//$body_len = ($body_len + 4 - 1) / 4 * 4;
$body_len = $body_len + 8;
$d="";
if($body_len%4!=0){
$a=$body_len%4;
$b=4-$a;
for($i=0;$i<$b;$i++){
$d.=pack("c","0");
$body_len++;
}
}
// 構(gòu)造頭部
$bin_head = pack ( "c*", "8" );
$bin_head .= pack ( "c", "0" );
$bin_head .= pack ( "S", $body_len );
$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body . $d;
// 發(fā)送數(shù)據(jù)
$res = $task_connection->send ( $bin_data );
}
// 發(fā)送獲取測(cè)試數(shù)據(jù)請(qǐng)求
if ($msg == "getmetadata") {
$Connid = $msg ;
$idx = $msg ;
$num = $msg ;
$meta_id=$msg;
//echo "meta_id="+$meta_id;
$arr = "{" . "\n\t\"type\":\"getmetadata\"," . "\n\t\"mi_id\":$Connid," . "\n\t\"idx\":$idx," . "\n\t\"meta_id\":$meta_id," . "\n\t\"n_rcrd\":$num" . "\n}";
$bin_body = $arr; // json_encode($arr);
// 包體長(zhǎng)度
$body_len = strlen ( $bin_body );
//$body_len = ($body_len + 4 - 1) / 4 * 4;
$body_len = $body_len + 8;
$d="";
if($body_len%4!=0){
$a=$body_len%4;
$b=4-$a;
for($i=0;$i<$b;$i++){
$d.=pack("c","0");
$body_len++;
}
}
// 構(gòu)造頭部
$bin_head = pack ( "c*", "8" );
$bin_head .= pack ( "c", "0" );
$bin_head .= pack ( "S", $body_len );
//$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body . pack ( "c", "0" );
$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body . $d;
// 發(fā)送數(shù)據(jù)
$res = $task_connection->send ( $bin_data );
}
//每秒鐘取一次數(shù)據(jù)
if ($msg == "sample") {
$Connid = $msg ;
$n_smpl = $msg ;
//$num = $msg ;
$meta_id=$msg;
//echo "meta_id="+$meta_id;
$arr = "{" . "\n\t\"type\":\"sample\"," . "\n\t\"mi_id\":$Connid," . "\n\t\"ml\":," . "\n\t\"n_smpl\":$n_smpl" . "\n}";
//echo "<pre>";
//print_r($arr);
$bin_body = $arr; // json_encode($arr);
// 包體長(zhǎng)度
$body_len = strlen ( $bin_body );
//$body_len = ($body_len + 4 - 1) / 4 * 4;
$body_len = $body_len + 8;
$d="";
if($body_len%4!=0){
$a=$body_len%4;
$b=4-$a;
for($i=0;$i<$b;$i++){
$d.=pack("c","0");
$body_len++;
}
}
// 構(gòu)造頭部
$bin_head = pack ( "c*", "8" );
$bin_head .= pack ( "c", "0" );
$bin_head .= pack ( "S", $body_len );
//$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body . pack ( "c", "0" );
$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body . $d;
// 發(fā)送數(shù)據(jù)
$res = $task_connection->send ( $bin_data );
}
// 發(fā)送登陸數(shù)據(jù)
if ($msg == "setparam") {
// 任務(wù)及參數(shù)數(shù)據(jù)
$arr = "{" . "\n\t\"type\":\"setparam\"," . "\n\t\"report\":1," . "\n\t\"report_ml\":" . "\n}";
//echo $arr;
// 構(gòu)造包體
$bin_body = $arr;
// 包體長(zhǎng)度
$body_len = strlen ( $bin_body );
//$body_len = ($body_len + 4 - 1) / 4 * 4;
$body_len = $body_len + 8;
$d="";
if($body_len%4!=0){
$a=$body_len%4;
$b=4-$a;
for($i=0;$i<$b;$i++){
$d.=pack("c","0");
$body_len++;
}
}
// 構(gòu)造頭部
$bin_head = pack ( "c*", "8" );
$bin_head .= pack ( "c", "0" );
$bin_head .= pack ( "S", $body_len );
$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body .$d;
//echo bin2hex($bin_data);
// 發(fā)送數(shù)據(jù)
$res = $task_connection->send ( $bin_data );
}
if($msg=="getmdinfo"){
// 登陸
$md_id=$msg;
//print_r($md_id);
$arr = "{" . "\n\t\"type\":\"login\"," . "\n\t\"oprt_id\":1000". "\n}";
// 構(gòu)造包體
$bin_body = $arr;
// 包體長(zhǎng)度
$body_len = strlen ( $bin_body );
//$body_len = ($body_len + 4 - 1) / 4 * 4;
$body_len = $body_len + 8;
$d="";
if($body_len%4!=0){
$a=$body_len%4;
$b=4-$a;
for($i=0;$i<$b;$i++){
$d.=pack("c","0");
$body_len++;
}
}
// 構(gòu)造頭部
$bin_head = pack ( "c*", "8" );
$bin_head .= pack ( "c", "0" );
$bin_head .= pack ( "S", $body_len );
$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body . $d;
//echo bin2hex($bin_data);
$res = $task_connection->send ( $bin_data );
$str="";
$md_id=explode(",", $md_id);
for($i=0;$i<count($md_id);$i++){
if($i==count($md_id)-1){
$str.="\"".$md_id."\"";
}else{
$str.="\"".$md_id."\",";
}
}
//echo $str;
//獲取設(shè)備狀態(tài)
$arr = "{" . "\n\t\"type\":\"getmdinfo\"," . "\n\t\"md_id\":". "\n}";
//echo $arr;
// 構(gòu)造包體
$bin_body = $arr;
// 包體長(zhǎng)度
$body_len = strlen ( $bin_body );
//$body_len = ($body_len + 4 - 1) / 4 * 4;
$body_len = $body_len + 8;
$d="";
if($body_len%4!=0){
$a=$body_len%4;
$b=4-$a;
for($i=0;$i<$b;$i++){
$d.=pack("c","0");
$body_len++;
}
}
// 構(gòu)造頭部
$bin_head = pack ( "c*", "8" );
$bin_head .= pack ( "c", "0" );
$bin_head .= pack ( "S", $body_len );
$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body . $d;
// 發(fā)送數(shù)據(jù)
$res = $task_connection->send ( $bin_data );
}
/**
*獲取統(tǒng)計(jì)值
*/
if($msg=="getmiinfo"){
$mi_id=$msg;
$ml=$msg;
$start_begin=$msg;
$start_num=$msg;
$rng_min=$msg;
$rng_max=$msg;
if($start_begin&&$rng_max){
$arr = "{" .
"\n\t\"type\":\"getmiinfo\"," .
"\n\t\"mi_id\":".$mi_id."," .
"\n\t\"stat_begin\":".$start_begin."," .
"\n\t\"stat_num\":".$start_num."," .
"\n\t\"rng_min\":".$rng_min."," .
"\n\t\"rng_max\":".$rng_max."," .
"\n\t\"ml\":"
. "\n}";
//echo $arr;
}else{
if($rng_max&&$rng_min){
$arr = "{" .
"\n\t\"type\":\"getmiinfo\"," .
"\n\t\"mi_id\":".$mi_id."," .
"\n\t\"rng_min\":".$rng_min."," .
"\n\t\"rng_max\":".$rng_max."," .
"\n\t\"ml\":"
. "\n}";
//echo $arr;
}else if($start_begin&&$start_num){
$arr = "{" .
"\n\t\"type\":\"getmiinfo\"," .
"\n\t\"mi_id\":".$mi_id."," .
"\n\t\"stat_begin\":".$start_begin."," .
"\n\t\"stat_num\":".$start_num."," .
"\n\t\"ml\":"
. "\n}";
}else{
$arr = "{" .
"\n\t\"type\":\"getmiinfo\"," .
"\n\t\"mi_id\":".$mi_id."," .
"\n\t\"ml\":"
. "\n}";
}
//echo $arr;
}
//echo $arr."\r\n";
// 構(gòu)造包體
$bin_body = $arr;
// 包體長(zhǎng)度
$body_len = strlen ( $bin_body );
//$body_len = ($body_len + 4 - 1) / 4 * 4;
$body_len = $body_len + 8;
$d="";
if($body_len%4!=0){
$a=$body_len%4;
$b=4-$a;
for($i=0;$i<$b;$i++){
$d.=pack("c","0");
$body_len++;
}
}
// 構(gòu)造頭部
$bin_head = pack ( "c*", "8" );
$bin_head .= pack ( "c", "0" );
$bin_head .= pack ( "S", $body_len );
$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body . $d;
//echo bin2hex($bin_data);
$res = $task_connection->send ( $bin_data );
}
if($msg=="setmetadata"){
$idx=$msg;
$value=$msg;
$meta_id=$msg;
$mi_id=$msg;
$arr = "{" .
"\n\t\"type\":\"setmetadata\"," .
"\n\t\"mi_id\":".$mi_id."," .
"\n\t\"idx\":".$idx.",".
"\n\t\"meta_id\":".$meta_id.",".
"\n\t\"value\":".$value
. "\n}";
//echo $arr;
// 構(gòu)造包體
$bin_body = $arr;
// 包體長(zhǎng)度
$body_len = strlen ( $bin_body );
//$body_len = ($body_len + 4 - 1) / 4 * 4;
$body_len = $body_len + 8;
$d="";
if($body_len%4!=0){
$a=$body_len%4;
$b=4-$a;
for($i=0;$i<$b;$i++){
$d.=pack("c","0");
$body_len++;
}
}
// 構(gòu)造頭部
$bin_head = pack ( "c*", "8" );
$bin_head .= pack ( "c", "0" );
$bin_head .= pack ( "S", $body_len );
$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body . $d;
//echo bin2hex($bin_data);
$res = $task_connection->send ( $bin_data );
}
/**
* 開(kāi)始
*/
if ($msg == "start") {
$si=$msg;
$md_id=$msg;
$ml=$msg;
//修改參數(shù)
//$arr = "{" . "\n\t\"type\":\"setparam\"," . "\n\t\"operator_id\":1000," . "\n\t\"report\":1," . "\n\t\"report_ml\":" . "\n}";
/* $arr = "{" .
"\n\t\"type\":\"setparam\"," .
"\n\t\"report\":1," .
"\n\t\"report_ml\":"
. "\n}"; */
$arr = "{" . "\n\t\"type\":\"setparam\"," . "\n\t\"report\":0," . "\n\t\"report_ml\":" . "\n}";
//echo $arr;
// 構(gòu)造包體
$bin_body = $arr;
// 包體長(zhǎng)度
$body_len = strlen ( $bin_body );
//$body_len = ($body_len + 4 - 1) / 4 * 4;
$body_len = $body_len + 8;
$d="";
if($body_len%4!=0){
$a=$body_len%4;
$b=4-$a;
for($i=0;$i<$b;$i++){
$d.=pack("c","0");
$body_len++;
}
}
// 構(gòu)造頭部
$bin_head = pack ( "c*", "8" );
$bin_head .= pack ( "c", "0" );
$bin_head .= pack ( "S", $body_len );
$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body . $d;
//echo bin2hex($bin_data);
$res = $task_connection->send ( $bin_data );
// start00000003000000010102030405060708
$arr = "{" . "\n\t\"type\":\"start\"," . "\n\t\"md_id\":\"".$md_id."\"," . "\n\t\"si\":".$si."," . "\n\t\"max_idx\": 1048576," . "\n\t\"ml\":" . "\n}";
//echo "start=".$arr;
// 構(gòu)造包體
$bin_body = $arr;
// 包體長(zhǎng)度
$body_len = strlen ( $bin_body );
//$body_len = ($body_len + 4 - 1) / 4 * 4;
$body_len = $body_len + 8;
$d="";
if($body_len%4!=0){
$a=$body_len%4;
$b=4-$a;
for($i=0;$i<$b;$i++){
$d.=pack("c","0");
$body_len++;
}
}
// 構(gòu)造頭部
$bin_head = pack ( "c*", "8" );
$bin_head .= pack ( "c", "0" );
$bin_head .= pack ( "S", $body_len );
$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body .$d;
//echo bin2hex($bin_data);
// 發(fā)送數(shù)據(jù)
$res = $task_connection->send ( $bin_data );
}
//file_put_contents ( "test1.txt", $arr . "\r\n", FILE_APPEND );
// 異步獲得結(jié)果
$task_connection->onMessage = function ($task_connection, $task_result) use($ws_connection) {
global $worker;
//file_put_contents ( "test3.txt", $task_result . "\r\n", FILE_APPEND );
$aa = unpack ( 'C*', substr ( $task_result, 0, 2 ) );
$bb = unpack ( 'S', substr ( $task_result, 2, 2 ) );
if ($aa == "7") {
$cc = unpack ( 'S', substr ( $task_result, 8, 2 ) );
$dd = unpack ( 'S', substr ( $task_result, 16, 2 ) );
if ($dd > 0) {
$nmd = $dd ;
//echo "meta_num=".$nmd."\r\n";
$indexcount = 0;//inde的和
$list=array();
for($zz = 0; $zz < $nmd; $zz ++) {
$idx = unpack ( 'S', substr ( $task_result, 20 + 8 * $zz + 2, 2 ) );
$meta_id = unpack ( 'S', substr ( $task_result, 20 + 8 * $zz + 4, 2 ) );
$n_rcrd = unpack ( 'S', substr ( $task_result, 20 + 8 * $zz + 6, 2 ) );
$teststr = "";
$metalist = null;
if ($n_rcrd > 0) {
$index = $n_rcrd ;
for($kk = 0; $kk < $index; $kk ++) {
/* echo "aaa=";
echo 20 + 8 _ $nmd + $indexcount _ 4 + 4 + $kk * 4;
echo "\r\n"; */
$tmpid = unpack ( 'f', substr ( $task_result, 20 + 8 _ $nmd + $indexcount _ 4 + 4 + $kk * 4, 4 ) );
if($meta_id==1){
$tmpid=number_format($tmpid,1,".","");
}else if($meta_id==2){
$tmpid=number_format($tmpid,1,".","");
}else if($meta_id==41){
$tmpid=number_format($tmpid,1,".","");
}else if($meta_id==3){
$tmpid=number_format($tmpid,3,".","");
}else if($meta_id==4){
$tmpid=number_format($tmpid,3,".","");
}else if($meta_id==5){
$tmpid=number_format($tmpid,2,".","");
}else if($meta_id==6){
//ccc=floatnum(arr_,4);
//ccc=arr_;
//ccc=aaa.toFixed(4);
$tmpid=number_format($tmpid,4,".","");
}else if($meta_id==17){
$tmpid=number_format($tmpid,3,".","");
}else if($meta_id==18){
$tmpid=number_format($tmpid,2,".","");
}else if($meta_id==20){
$tmpid=number_format($tmpid,2,".","");
}else if($meta_id==5){
$tmpid=number_format($tmpid,2,".","");
}else {
$tmpid=number_format($tmpid,2,".","");
}
//$metalist = $tmpid ;
$metalist =$tmpid;
}
$indexcount = $indexcount + $n_rcrd ;
//echo "count=" . $indexcount . "\r\n";
//echo "index=" . $n_rcrd . "\r\n";
$list = $metalist;
$teststr = json_encode ( $list );
}
$list = $meta_id ;
$list = $idx ;
//file_put_contents ( "test3.txt", " type:" . $aa . "\r\n" . " flags:" . $aa . "\r\n" . " plen:" . $bb . "\r\n" . " mi_id:" . $cc . "\r\n" . " n_mb:" . $dd . "\r\n" . " idx:" . $idx . "\r\n" . " meta_id:" . $meta_id . "\r\n" . " n_rcrd:" . $n_rcrd . "\r\n" . " Meta:" . $teststr . "\r\n", FILE_APPEND );
}
$arr=array("type"=>"getmetadata_reply","data"=>$list);
//echo "<pre>";
//print_r($arr);
$s=json_encode($arr);
$mid=$cc;
//echo "mid=".$mid."\r\n";
$ws_connection=$worker->uidConnections;
/* if($s==null){
$s=1;
} */
$ws_connection->send ( $s );
}
} else if($aa == "11"){
$cc = unpack ( 'S', substr ( $task_result, 8, 2 ) );
$dd = unpack ( 'S', substr ( $task_result, 16, 2 ) );
if ($dd > 0) {
$nmd = $dd ;
//echo "meta_num=".$nmd."\r\n";
$indexcount = 0;//inde的和
$list=array();
for($zz = 0; $zz < $nmd; $zz ++) {
$idx = unpack ( 'S', substr ( $task_result, 20 + 8 * $zz + 2, 2 ) );
$meta_id = unpack ( 'S', substr ( $task_result, 20 + 8 * $zz + 4, 2 ) );
$n_rcrd = unpack ( 'S', substr ( $task_result, 20 + 8 * $zz + 6, 2 ) );
$teststr = "";
$metalist = null;
if ($n_rcrd > 0) {
$index = $n_rcrd ;
for($kk = 0; $kk < $index; $kk ++) {
/* echo "aaa=";
echo 20 + 8 _ $nmd + $indexcount _ 4 + 4 + $kk * 4;
echo "\r\n"; */
$tmpid = unpack ( 'f', substr ( $task_result, 20 + 8 _ $nmd + $indexcount _ 4 + 4 + $kk * 4, 4 ) );
//$metalist = $tmpid ;
//$tmpid=number_format($tmpid,5,".","");
if($meta_id==1){
$tmpid=number_format($tmpid,1,".","");
}else if($meta_id==2){
$tmpid=number_format($tmpid,1,".","");
}else if($meta_id==41){
$tmpid=number_format($tmpid,1,".","");
}else if($meta_id==3){
$tmpid=number_format($tmpid,3,".","");
}else if($meta_id==4){
$tmpid=number_format($tmpid,3,".","");
}else if($meta_id==5){
$tmpid=number_format($tmpid,2,".","");
}else if($meta_id==6){
//ccc=floatnum(arr_,4);
//ccc=arr_;
//ccc=aaa.toFixed(4);
$tmpid=number_format($tmpid,4,".","");
}else if($meta_id==17){
$tmpid=number_format($tmpid,3,".","");
}else if($meta_id==18){
$tmpid=number_format($tmpid,2,".","");
}else if($meta_id==20){
$tmpid=number_format($tmpid,2,".","");
}else if($meta_id==5){
$tmpid=number_format($tmpid,2,".","");
}else {
$tmpid=number_format($tmpid,2,".","");
}
//echo $meta_id."\r\n";
$metalist =$tmpid;
}
$indexcount = $indexcount + $n_rcrd ;
//echo "count=" . $indexcount . "\r\n";
//echo "index=" . $n_rcrd . "\r\n";
$list = $metalist;
$teststr = json_encode ( $list );
}
$list = $meta_id ;
$list = $idx ;
//file_put_contents ( "test3.txt", " type:" . $aa . "\r\n" . " flags:" . $aa . "\r\n" . " plen:" . $bb . "\r\n" . " mi_id:" . $cc . "\r\n" . " n_mb:" . $dd . "\r\n" . " idx:" . $idx . "\r\n" . " meta_id:" . $meta_id . "\r\n" . " n_rcrd:" . $n_rcrd . "\r\n" . " Meta:" . $teststr . "\r\n", FILE_APPEND );
}
$arr=array("type"=>"sample_reply","data"=>$list);
//echo "<pre>";
//print_r($arr);
$s=json_encode($arr);
//$s=json_encode($arr);
//echo "<pre>";
//print_r($arr);
/* if($s==null){
$s=1;
} */
if(!isset($ws_connection->uid)){
$ws_connection->send ( $s );
}else{
$ws_connection=$worker->uidConnections;
$ws_connection->send ( $s );
}
}
}else {
$a = substr ( $task_result, 8, strrpos ( $task_result, '}' ) - 7 );
//echo $a."\r\n";
$s = str_replace ( "\n\t", "", $a );
$s = str_replace ( " ", "", $s );
//$s = str_replace ( "\r", "", $s );
//$s = str_replace ( "}]}]", "}]", $s );
$s = str_replace ( "}]\r\n}]", "}]", $s );
$s = str_replace ( "}\r\n}", "}", $s );
$s1 = json_decode ( $s ,true);
$s = json_decode ( $s);
if($s1=="start_reply"){
}
$s = json_encode ( $s );
//echo $s."\r\n";
//$ws_connection=$worker->uidConnections;
$ws_connection->send ( $s);
}
// 結(jié)果
// var_dump ( $task_result );
// var_dump ( $s );
// 獲得結(jié)果后記得關(guān)閉異步鏈
// $task_connection->close();
// 通知對(duì)應(yīng)的websocket客戶(hù)端任務(wù)完成
// if($s)
/* $data = $s;
if ($aa == "7") {
}else{
} */
};
// 執(zhí)行異步鏈接
$task_connection->connect ();
};
Worker::runAll ();
?>