walkor大神,目前需求是這樣的:
有一群商家在后臺(tái)網(wǎng)頁處理批量導(dǎo)入產(chǎn)品 -》 服務(wù)器接受請(qǐng)求 -》 開始foreach一個(gè)一個(gè)處理導(dǎo)入請(qǐng)求;
我現(xiàn)在想每成功導(dǎo)入一個(gè)就推送到前臺(tái)顯示已經(jīng)導(dǎo)入成功,直到全部導(dǎo)入自動(dòng)結(jié)束推送。
看了聊天室代碼,消息推送都是靠前端js+event.php,我想直接在php里面不需要onMessage觸發(fā).
我從下午看到現(xiàn)在文檔,也看了很多問答,依然非常糊涂,不奢望給整段代碼,但是希望walkor大神給點(diǎn)思路。
后端代碼
push.php
<?php
use Workerman\Worker;
require_once './Workerman/Autoloader.php';
// 初始化一個(gè)worker容器,監(jiān)聽1234端口
global $worker;
$worker = new Worker('websocket://0.0.0.0:1234');
// 這里進(jìn)程數(shù)必須設(shè)置為1
$worker->count = 1;
// worker進(jìn)程啟動(dòng)后建立一個(gè)內(nèi)部通訊端口
$worker->onWorkerStart = function($worker)
{
// 開啟一個(gè)內(nèi)部端口,方便內(nèi)部系統(tǒng)推送數(shù)據(jù),Text協(xié)議格式 文本+換行符
$inner_text_worker = new Worker('Text://0.0.0.0:5678');
$inner_text_worker->onMessage = function($connection, $buffer)
{
global $worker;
// $data數(shù)組格式,里面有uid,表示向那個(gè)uid的頁面推送數(shù)據(jù)
$data = json_decode($buffer, true);
$uid = $data['uid'];
// 通過workerman,向uid的頁面推送數(shù)據(jù)
$ret = sendMessageByUid($uid, $data['percent']);
// 返回推送結(jié)果
$connection->send($ret ? 'ok' : "uid $uid not online");
};
$inner_text_worker->listen();
};
// 新增加一個(gè)屬性,用來保存uid到connection的映射
$worker->uidConnections = array();
// 當(dāng)有客戶端發(fā)來消息時(shí)執(zhí)行的回調(diào)函數(shù)
$worker->onMessage = function($connection, $data)use($worker)
{
// 判斷當(dāng)前客戶端是否已經(jīng)驗(yàn)證,既是否設(shè)置了uid
if(!isset($connection->uid))
{
// 沒驗(yàn)證的話把第一個(gè)包當(dāng)做uid(這里為了方便演示,沒做真正的驗(yàn)證)
$connection->uid = $data;
/* 保存uid到connection的映射,這樣可以方便的通過uid查找connection,
* 實(shí)現(xiàn)針對(duì)特定uid推送數(shù)據(jù)
*/
$worker->uidConnections[$connection->uid] = $connection;
return;
}
};
// 當(dāng)有客戶端連接斷開時(shí)
$worker->onClose = function($connection)use($worker)
{
global $worker;
if(isset($connection->uid))
{
// 連接斷開時(shí)刪除映射
unset($worker->uidConnections[$connection->uid]);
}
};
// 向所有驗(yàn)證的用戶推送數(shù)據(jù)
function broadcast($message)
{
global $worker;
foreach($worker->uidConnections as $connection)
{
$connection->send($message);
}
}
// 針對(duì)uid推送數(shù)據(jù)
function sendMessageByUid($uid, $message)
{
global $worker;
if(isset($worker->uidConnections[$uid]))
{
$connection = $worker->uidConnections[$uid];
$connection->send($message);
return true;
}
return false;
}
// 運(yùn)行所有的worker(其實(shí)當(dāng)前只定義了一個(gè))
Worker::runAll();
啟動(dòng)后端服務(wù)
php push.php start -d
前端接收推送的js代碼
var ws = new WebSocket('ws://127.0.0.1:1234');
ws.onopen = function(){
var uid = 'uid1';
ws.send(uid);
};
ws.onmessage = function(e){
alert(e.data);
};
后端推送消息的代碼
// 建立socket連接到內(nèi)部推送端口
$client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
// 推送的數(shù)據(jù),包含uid字段,表示是給這個(gè)uid推送
$data = array('uid'=>'uid1', 'percent'=>'88%');
// 發(fā)送數(shù)據(jù),注意5678端口是Text協(xié)議的端口,Text協(xié)議需要在數(shù)據(jù)末尾加上換行符
fwrite($client, json_encode($data)."\n");
// 讀取推送結(jié)果
echo fread($client, 8192);
這里的uid不一定是用戶的id,也可以理解為任務(wù)id即 taskid
記得開放1234 5678 兩個(gè)端口的防火墻。如果是云服務(wù)器,還要開放這兩個(gè)端口的安全組。
以上代碼親測(cè)可以直接使用
@1 群主經(jīng)測(cè)試您給的后端代碼前端js,workerman推送代碼只對(duì)一個(gè)頁面有效,并不是對(duì)所有打開的頁面有效,我打算對(duì)所有頁面有效,我該怎么做呢?或者我如何設(shè)置成可以向所有打開頁面的uid推送。希望群主給點(diǎn)思路呀。
@1 群主push.php里面有broadcast方法向所有頁面推送,但是我前端js中怎樣把所有頁面uid傳同一個(gè)呢?如果所有頁面uid傳同一個(gè)的話那后端php代碼是不是不用向push.php發(fā)送指定的uid了呢?
@1 群主我補(bǔ)充一下我的應(yīng)用場(chǎng)景呀 我是在不同瀏覽器,或者不同的電腦下相同或不同的瀏覽器打開同一個(gè)頁面(網(wǎng)址一樣)讓他們都能推送,我已經(jīng)設(shè)置了定時(shí)刷新。我希望每個(gè)用戶打開這個(gè)界面都能定時(shí)看到推送。
@6607:Linux不能用應(yīng)該是防火墻沒設(shè)置這個(gè)端口導(dǎo)致的,在防火墻加這條規(guī)則,然后重啟防火墻就可以了:-A INPUT -p tcp -m tcp --dport 1234 -j ACCEPT,我剛好遇到這個(gè)問題,這樣解決的
哈啰 !push.php line 4: $worker->count = 1;為什么只能設(shè)置一個(gè)進(jìn)程啊
假如:
客戶端1連接進(jìn)程A
客戶端2連接進(jìn)程B
客戶端2無法直接通過進(jìn)程B給客戶端1發(fā)送數(shù)據(jù),因?yàn)榭蛻舳?屬于進(jìn)程A不屬于進(jìn)程B,B進(jìn)程控制不到客戶端1(要想兩個(gè)進(jìn)程之間通訊需要一些進(jìn)程間通訊手段,可以使用http://doc3.workerman.net/component/channel.html)。
所以所有客戶端都只能連接同一個(gè)進(jìn)程才能直接互相通訊,為了避免客戶端連到不同進(jìn)程,count設(shè)置為1。
執(zhí)行后端推送代碼時(shí),出現(xiàn) unable to connect, connect refused. 請(qǐng)問這是什么原因造成的尼?我已經(jīng)更換了多個(gè)端口進(jìn)行測(cè)試,依然是同樣的提示。還有什么測(cè)試方法和手段來找出原因嘛?
后端推送消息的代碼
// 建立socket連接到內(nèi)部推送端口
$client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1, STREAM_CLIENT_CONNECT|STREAM_CLIENT_PERSISTENT);
// 推送的數(shù)據(jù),包含uid字段,表示是給這個(gè)uid推送
$data = array('uid'=>'uid1', 'percent'=>'88%');
// 發(fā)送數(shù)據(jù),注意5678端口是Text協(xié)議的端口,Text協(xié)議需要在數(shù)據(jù)末尾加上換行符
fwrite($client, json_encode($data)."\n");
// 讀取推送結(jié)果
echo fread($client, 8192);
首先,我先建一個(gè) ump worker , 再在 work 進(jìn)程啟動(dòng)后建立一個(gè)內(nèi)部通訊端口。然后,我再寫 php socket 腳本來反問內(nèi)部通訊端口,提示 connect refused。防火墻是否阻擋,端口是否被占用,服務(wù)是否開啟,客戶端鏈接的 ip 都這幾個(gè)問題都確認(rèn)沒有問題。
結(jié)果,卻是一直提示 connection refused.
use Workerman\Worker;
include './Workerman/Autoloader.php';
$worker = new Worker("udp://192.168.50.190:8800");
//var_dump($worker);
$worker->count = 1;
$worker->onWorkerStart = function ($worker)
{
// 用于 Laravel 與 Workerman 的內(nèi)部通訊
$inner_text_worker = new Worker ("text://192.168.50.190:5678");
// 接收到 Laravel 請(qǐng)求信息,就向設(shè)備發(fā)起 UDP 請(qǐng)求
$inner_text_worker->onMessage = function ($connection, $arr_data)
{
$connection->send('success udp');
};
};
// 接收 UDP 請(qǐng)求,如:心跳
$worker->onMessage = function ($connection, $data)
{
echo $data . "<br/>";
$client_ip = $connection->getRemoteIp();
$client_port = $connection->getRemotePort();
$connection->send(strrev($data));
};
}}
// 開啟內(nèi)部通訊端口打印輸出的對(duì)象:object(Workerman\Worker)#6 (24) {
=>
int(0)
=>
string(4) "none"
=>
int(1)
=>
string(0) ""
=>
string(0) ""
=>
bool(true)
=>
bool(false)
=>
NULL
=>
NULL
=>
object(Closure)#7 (1) {
=>
_RECURSION_
}
=>
NULL
=>
NULL
=>
NULL
=>
NULL
=>
NULL
=>
NULL
=>
string(3) "tcp"
=>
array(0) {
}
=>
string(0) ""
=>
string(37) "/Library/WebServer/Documents/camerawk"
=>
NULL
=>
string(26) "text://192.168.50.190:5678"
=>
resource(17) of type (stream-context)
=>
string(32) "000000000f4c6bca0000000043696ce7"
}
// php socket 腳本對(duì)內(nèi)部端口通訊發(fā)起請(qǐng)求,報(bào)錯(cuò):stream_socket_client(): unable to connect to tcp://192.168.50.190:5678 (Connection refused)
問題:如果我 new 的 worker 是監(jiān)聽是的 http 通訊 8080端口,當(dāng)有接收到信息時(shí),那么 workerman 的onMessage 方法執(zhí)行 $connection->send("message") 。
我的疑問時(shí) $connection->send("message") 發(fā)送的信息,會(huì)以什么端口,從服務(wù)器發(fā)出去呢?
$worker->onMessage = function($connection, $data) use ($worker)
{
$connection->send("返回信息");
};
上面的沒看得太明白,push.php是運(yùn)行在workman里面的 后端推送消息代碼是運(yùn)行在web后臺(tái)的對(duì)嗎,通過后端推送消息代碼調(diào)用push.php.是這個(gè)思路嗎
要實(shí)現(xiàn)我這種模式 問號(hào)部分該用什么方法實(shí)現(xiàn)呢
// 建立socket連接到內(nèi)部推送端口
$client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
// 推送的數(shù)據(jù),包含uid字段,表示是給這個(gè)uid推送
$data = array('uid'=>'uid1', 'percent'=>'88%');
// 發(fā)送數(shù)據(jù),注意5678端口是Text協(xié)議的端口,Text協(xié)議需要在數(shù)據(jù)末尾加上換行符
fwrite($client, json_encode($data)."\n");
// 讀取推送結(jié)果
echo fread($client, 8192);
這里不是寫得很清楚嘛?
workerman 的文檔里面也有
http://doc3.workerman.net/315240
推薦使用 GatewayWorker 的方式
本質(zhì)就是進(jìn)程間通訊而已
$inner_text_worker = new Worker('Text://0.0.0.0:5678');
第一次運(yùn)行沒有問題,但第二次運(yùn)行
fwrite($client, json_encode($data)."\n");
這里會(huì)報(bào)錯(cuò),換一個(gè)端口運(yùn)行,又可以了,這是為什么呢?
use Yii;
use yii\console\Controller;
use \Workerman\Worker;
require_once 'vendor/autoload.php';
class WorkerController extends Controller {
public function actionPush(){
// 初始化一個(gè)worker容器,監(jiān)聽1234端口
require_once dirname(Yii::$app->basePath).'/vendor/workerman/workerman/Autoloader.php';
$worker = new Worker('websocket://127.0.0.1:1234');
// 這里進(jìn)程數(shù)必須設(shè)置為1
$worker->count = 1;
// worker進(jìn)程啟動(dòng)后建立一個(gè)內(nèi)部通訊端口
$worker->onWorkerStart = function($worker)
{
// 開啟一個(gè)內(nèi)部端口,方便內(nèi)部系統(tǒng)推送數(shù)據(jù),Text協(xié)議格式 文本+換行符
$inner_text_worker = new Worker('http://127.0.0.1:5678');
$inner_text_worker->onMessage = function($connection, $buffer)
{
global $worker;
// $data數(shù)組格式,里面有uid,表示向那個(gè)uid的頁面推送數(shù)據(jù)
$data = json_decode($buffer, true);
$uid = $data;
// 通過workerman,向uid的頁面推送數(shù)據(jù)
$ret = $this->sendMessageByUid($uid, $buffer);
// 返回推送結(jié)果
$connection->send($ret ? 'ok' : 'fail');
};
$inner_text_worker->listen();
};
// 新增加一個(gè)屬性,用來保存uid到connection的映射
$worker->uidConnections = array();
// 當(dāng)有客戶端發(fā)來消息時(shí)執(zhí)行的回調(diào)函數(shù)
$worker->onMessage = function($connection, $data)use($worker)
{
// 判斷當(dāng)前客戶端是否已經(jīng)驗(yàn)證,既是否設(shè)置了uid
if(!isset($connection->uid))
{
// 沒驗(yàn)證的話把第一個(gè)包當(dāng)做uid(這里為了方便演示,沒做真正的驗(yàn)證)
$connection->uid = $data;
/* 保存uid到connection的映射,這樣可以方便的通過uid查找connection,
* 實(shí)現(xiàn)針對(duì)特定uid推送數(shù)據(jù)
*/
$worker->uidConnections = $connection;
return;
}
};
// 當(dāng)有客戶端連接斷開時(shí)
$worker->onClose = function($connection)use($worker)
{
global $worker;
if(isset($connection->uid))
{
// 連接斷開時(shí)刪除映射
unset($worker->uidConnections);
}
};
// 運(yùn)行所有的worker(其實(shí)當(dāng)前只定義了一個(gè))
Worker::runAll();
}
// 向所有驗(yàn)證的用戶推送數(shù)據(jù)
function broadcast($message)
{
global $worker;
foreach($worker->uidConnections as $connection)
{
$connection->send($message);
}
}
// 針對(duì)uid推送數(shù)據(jù)
function sendMessageByUid($uid, $message)
{
global $worker;
if(isset($worker->uidConnections))
{
$connection = $worker->uidConnections;
$connection->send($message);
return true;
}
return false;
}
}
stream_socket_client(): unable to connect to tcp://127.0.0.1:5678 (???????????????????????
public function pushma($msg){
header("Content-Type: text/html; charset=UTF-8");
// 建立socket連接到內(nèi)部推送端口
$client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
// 推送的數(shù)據(jù),包含uid字段,表示是給這個(gè)uid推送
$data = array('uid'=>'uid1', 'percent'=>$msg);
// 發(fā)送數(shù)據(jù),注意5678端口是Text協(xié)議的端口,Text協(xié)議需要在數(shù)據(jù)末尾加上換行符
fwrite($client, json_encode($data)."\n");
// 讀取推送結(jié)果
echo fread($client, 8192);
}
這是調(diào)用代碼
[attach]574[/attach]
這個(gè)斷開,是否還要用心跳來判斷,還是內(nèi)部自動(dòng)會(huì)判斷
如果沒發(fā)消息過來了,會(huì)不會(huì)就自己會(huì)斷開
這個(gè)demo心跳作用是用來防止鏈接由于長(zhǎng)時(shí)間不活躍被路由節(jié)點(diǎn)防火墻關(guān)閉。如果你預(yù)計(jì)這個(gè)鏈接要維持很長(zhǎng)時(shí)間(超過一分鐘),需要客戶端定時(shí)發(fā)一點(diǎn)數(shù)據(jù)給服務(wù)端,用來保持鏈接活躍。服務(wù)端onMessage里判斷下如果是心跳消息忽略即可。
如果鏈接預(yù)計(jì)低于一分鐘,可以不用發(fā)心跳。
@1:walkor大神,正如我昨天提的那個(gè)問題,如果后端消息不停推送,在onMessage里面是沒法收到消息的,只能等后端消息推送完畢后,才能接受到,可能這時(shí)候已經(jīng)超過了后端設(shè)置的心跳時(shí)間(比如1分鐘),就會(huì)在onclose主動(dòng)關(guān)閉客戶端連接,然后客戶端重新連接,這中間的數(shù)據(jù)就會(huì)丟失,這種情況怎么解決呢?
感覺很不穩(wěn)定,不知應(yīng)該怎么來調(diào)試
客戶端寫了心跳,比之前的沒心跳的,連接情況更差
消息返回成功的很少,而且即使返回成功,但客戶端,還是沒有接收到數(shù)據(jù)
以前一兩小時(shí),發(fā)生三四次失敗,現(xiàn)在是經(jīng)常失敗
心跳處理這樣可以吧?不需要到5678端口上吧
[attach]576[/attach]
寫的比較好,測(cè)試沒問題,很好解決了PHP客戶端與web socket端之間的數(shù)據(jù)監(jiān)聽通訊
我測(cè)試的時(shí)候也發(fā)現(xiàn)這個(gè)情況,但原因是因?yàn)闀r(shí)間長(zhǎng)了和服務(wù)器斷開連接了,這樣父進(jìn)程weibsocket里原來的uid1已經(jīng)刪除了,這個(gè)時(shí)候php客戶端發(fā)送數(shù)據(jù),數(shù)據(jù)其實(shí)服務(wù)端的子進(jìn)程接收到了,但由于沒有找到uid,所以php客戶端沒有收到回復(fù)。加上心跳,代碼邏輯上再優(yōu)化些應(yīng)該可以解決。
大佬們,
能不能問一下你這個(gè)問題的第三部分:后端推送消息的代碼
這一部分代碼是應(yīng)該寫在哪里哦?跟后端代碼寫一個(gè)類里面么?新手求指點(diǎn)。。。
看看這個(gè)workerman實(shí)戰(zhàn)視頻,保準(zhǔn)都會(huì)了
http://study.163.com/course/introduction/1005015012.htm?share=2&shareId=400000000388007
總體上該方案非常優(yōu)秀,框架大概二個(gè)部分,第一個(gè)是websocket作為服務(wù)端接收和發(fā)送消息的父進(jìn)程,不妨稱之為fatherWorker,比較巧妙的是在該對(duì)象里嵌套了一個(gè)TEXT協(xié)議的worker,不妨叫他childWorker,這個(gè)功能用一個(gè)PHP頁面就能解決。第二個(gè)是客戶端,該客戶端可以用本地建立一個(gè)簡(jiǎn)單網(wǎng)站實(shí)現(xiàn),用來登錄界面操作,里面兩個(gè)頁面,第一個(gè)頁面是用來和服務(wù)端的父進(jìn)程websocket建立連接,并發(fā)送UID的,這個(gè)頁面可以是普通的html頁面。第二個(gè)頁面是用來和服務(wù)端的子進(jìn)程TEXT協(xié)議建立連接的,其作用是發(fā)送UID編號(hào)和數(shù)據(jù),這里要注意的是兩個(gè)頁面的UID號(hào)必須一致,否則發(fā)送失敗。
這樣,三個(gè)頁面,實(shí)現(xiàn)了WEB服務(wù)、SOCKET服務(wù)和TEXT協(xié)議服務(wù)三者聯(lián)動(dòng)的效果,構(gòu)思巧妙,非常優(yōu)秀!
代碼我依樣畫葫蘆測(cè)試過了,沒有問題,感謝分享,我剛學(xué)習(xí)workerman,說些自己的理解,不當(dāng)之處大家指正。
workerman新手,根據(jù)自己入門經(jīng)歷的疑惑,講一下表面的小白使用問題,也是新手小白容易產(chǎn)生的疑惑:
可以確定,下載workerman最新版,不修改示例代碼,示例代碼是百分百可以在linux和windows都可以正常運(yùn)行的。
windows中出現(xiàn)的問題:
(1)、執(zhí)行順序!一定不能錯(cuò):第一步,php push.php,第二步,瀏覽器的頁面上運(yùn)行執(zhí)行js ,第三步,執(zhí)行stream_socket_client的5678端口的那段代碼
(2)、windows本機(jī)測(cè)試中,第二步的js,可以在任何一個(gè)瀏覽器的console中執(zhí)行,但是要注意,如果是打算把js放進(jìn)自己寫的html頁面,是無法用本地windows的apache服務(wù)訪問頁面的,原因應(yīng)該就是windows下php只能用一個(gè)進(jìn)程,而php進(jìn)程已經(jīng)被第一步占用。同樣,第三步,也不能用windows本機(jī)的apache請(qǐng)求頁面或者php命令行執(zhí)行php,可以通過telnet 127.0.0.1:5678的方式,輸入:{"uid":"uid1","percent":"2%"},瀏覽器js就可以收到了
(3)、這個(gè)代碼是可以調(diào)試的(好像是廢話)。可能對(duì)于小白而言,因?yàn)閷?duì)workerman陌生一時(shí)忘記了怎么調(diào)試。其實(shí)和普通的php開發(fā)一樣,可以直接echo/var_dump打印輸出,也可以記錄到文件里。
telnet 127.0.0.1 5678 的命令下,也可以調(diào)試5678接收第三步消息的信息:push.php代碼里,在$connection->send($ret ? 'ok' : 'fail');前,加上自己要調(diào)試的內(nèi)容send即可,如:$connection->send($buffer);
? ?!! push.php調(diào)試代碼修改后,一定要重新啟動(dòng)第一步(linux下restart/reload;windows下退出進(jìn)程,重新執(zhí)行),才能生效
? ? linux下執(zhí)行php push.php start 后面不加-d,不讓后臺(tái)運(yùn)行,可以查看打印出的調(diào)試輸出
(4)、一旦出現(xiàn)問題,檢查步驟:
? ? ? 1、telnet 127.0.0.1 1234
? ? ? ? ? ?telnet 127.0.0.1 5678
? ? ? ? ?如果不通,端口是否開啟,linux下檢查iptables
? ? ? 2、如果是第三步返回fail,一般是uid用戶連接斷開了,只需要瀏覽器重新執(zhí)行一下第二步j(luò)s,在重新執(zhí)行第三步
向端口推送數(shù)據(jù)的時(shí)候,返回的一致是false;
$client = stream_socket_client('tcp://127.0.0.1:2345',$error,$errmsg,1);
var_dump($client);
//推送的數(shù)據(jù),包含的uid字段,表示給這個(gè)用戶uid推送
$data = array('uid'=>'uid1', 'percent'=>'88%');
fwrite($client, json_encode($data)."\n");
echo fread($client, 8192);
大佬呀,為啥向端口推送數(shù)據(jù)的時(shí)候,返回的一致是false呀
$client = stream_socket_client('tcp://127.0.0.1:2345',$error,$errmsg,1);
var_dump($client);
大家看看我寫的對(duì)著沒 通過http給客戶端推送tcp消息
https://github.com/phpyii/workerman-test/tree/master/test/tcp