在gateway中使用AsyncTcpConnection把任務發(fā)給task_worker去處理,參考了異步任務文檔http://www.wtbis.cn/doc/workerman/faq/async-task.html
發(fā)現task_worker的onMessage一直無法觸發(fā),如果在send前加上connect就能觸發(fā)??吹揭灿腥颂釂栠^類似問題但是沒有解答
public static function onWorkerStart($businessWorker)
{
if ($businessWorker->id == 0) {
Timer::add($msgInterval, function () {
// 與遠程task服務建立異步連接,ip為遠程task服務的ip,如果是本機就是127.0.0.1,如果是集群就是lvs的ip
$task_connection = new AsyncTcpConnection('Text://127.0.0.1:12345');
// $task_connection->connect();
// 發(fā)送數據
$task_connection->send(json_encode($taskData));
// 異步獲得結果
$task_connection->onMessage = function(AsyncTcpConnection $task_connection, $task_result) {
// 執(zhí)行結果
var_dump($task_result);
// 獲得結果后記得關閉異步連接
$task_connection->close();
};
});
}
}
// task worker,使用Text協(xié)議
$task_worker = new Worker('Text://0.0.0.0:12345');
// task進程數可以根據需要多開一些
$task_worker->count = 1;
// task進程名稱
$task_worker->name = 'TaskWorker';
$task_worker->onWorkerStart = function(Worker $worker)
{
echo "Worker starting...\n";
};
$task_worker->onConnect = function($connection)
{
echo "new connection from ip " . $connection->getRemoteIp() . "\n";
};
// 設置業(yè)務處理函數
$task_worker->onMessage = function($connection, $task_data)
{
// 假設發(fā)來的是json數據
$task_data = json_decode($task_data, true);
trace([$task_data,'msg'=>'發(fā)送后'],'crontab');
// 根據task_data處理相應的任務邏輯.... 得到結果,這里省略....
$task_result = 'done';
// 發(fā)送結果
$connection->send(json_encode($task_result));
};