如何實(shí)現(xiàn)異步任務(wù)
問:
如何異步處理繁重的業(yè)務(wù),避免主業(yè)務(wù)被長時(shí)間阻塞。例如我要給1000用戶發(fā)送郵件,這個(gè)過程很慢,可能要阻塞數(shù)秒,這個(gè)過程中因?yàn)橹髁鞒瘫蛔枞?,會影響后續(xù)的請求,如何將這樣的繁重任務(wù)交給其它進(jìn)程異步處理。
答:
可以在本機(jī)或者其它服務(wù)器甚至服務(wù)器集群預(yù)先建立一些任務(wù)進(jìn)程處理繁重的業(yè)務(wù),任務(wù)進(jìn)程數(shù)可以開多一些,例如cpu的10倍,然后調(diào)用方利用AsyncTcpConnection將數(shù)據(jù)異步發(fā)送給這些任務(wù)進(jìn)程異步處理,異步得到處理結(jié)果。
任務(wù)進(jìn)程服務(wù)端
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// task worker,使用Text協(xié)議
$task_worker = new Worker('Text://0.0.0.0:12345');
// task進(jìn)程數(shù)可以根據(jù)需要多開一些
$task_worker->count = 100;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data)
{
// 假設(shè)發(fā)來的是json數(shù)據(jù)
$task_data = json_decode($task_data, true);
// 根據(jù)task_data處理相應(yīng)的任務(wù)邏輯.... 得到結(jié)果,這里省略....
$task_result = ......
// 發(fā)送結(jié)果
$connection->send(json_encode($task_result));
};
Worker::runAll();
在workerman中調(diào)用
use Workerman\Worker;
use \Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// websocket服務(wù)
$worker = new Worker('websocket://0.0.0.0:8080');
$worker->onMessage = function(TcpConnection $ws_connection, $message)
{
// 與遠(yuǎn)程task服務(wù)建立異步連接,ip為遠(yuǎn)程task服務(wù)的ip,如果是本機(jī)就是127.0.0.1,如果是集群就是lvs的ip
$task_connection = new AsyncTcpConnection('Text://127.0.0.1:12345');
// 任務(wù)及參數(shù)數(shù)據(jù)
$task_data = array(
'function' => 'send_mail',
'args' => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
);
// 發(fā)送數(shù)據(jù)
$task_connection->send(json_encode($task_data));
// 異步獲得結(jié)果
$task_connection->onMessage = function(AsyncTcpConnection $task_connection, $task_result)use($ws_connection)
{
// 結(jié)果
var_dump($task_result);
// 獲得結(jié)果后記得關(guān)閉異步連接
$task_connection->close();
// 通知對應(yīng)的websocket客戶端任務(wù)完成
$ws_connection->send('task complete');
};
// 執(zhí)行異步連接
$task_connection->connect();
};
Worker::runAll();
這樣,繁重的任務(wù)交給本機(jī)或者其它服務(wù)器的進(jìn)程去做,任務(wù)完成后會異步收到結(jié)果,業(yè)務(wù)進(jìn)程就不會阻塞了。