to:Dear walkor,
我的項(xiàng)目采用workerman的GW/BW模型,設(shè)置了4個(gè)GW和4個(gè)BW進(jìn)程,BW的onmessage收到客戶(hù)的命令消息后,要生成一個(gè)全局的訂單編號(hào),然后生成訂單保存到MYSQL數(shù)據(jù)庫(kù)。測(cè)試的時(shí)候,客戶(hù)端開(kāi)了4個(gè)進(jìn)程不停地發(fā)送訂單命令,就會(huì)卡住了,其他新的客戶(hù)端不能登錄了,GW進(jìn)程運(yùn)行只能正常能收到消息,但BW卡住了。
我現(xiàn)在要開(kāi)新的worker進(jìn)程專(zhuān)門(mén)處理繁重的業(yè)務(wù),包括生成訂單和保存訂單等,該怎么做?是收到命令的時(shí)候?qū)崟r(shí)建立進(jìn)程嗎?
可以在本機(jī)或者其它服務(wù)器甚至服務(wù)器集群預(yù)先建立一些任務(wù)進(jìn)程處理繁重的業(yè)務(wù),任務(wù)進(jìn)程數(shù)可以開(kāi)多一些,例如cpu的10倍。
任務(wù)進(jìn)程服務(wù)端
// task worker,使用Text協(xié)議
$task_worker = new Worker('Text://0.0.0.0:12345');
// task進(jìn)程數(shù)可以根據(jù)需要多開(kāi)一些
$task_worker->count = 100;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function($connection, $task_data)
{
// 假設(shè)發(fā)來(lái)的是json數(shù)據(jù)
$task_data = json_decode($task_data, true);
// 根據(jù)task_data處理相應(yīng)的任務(wù)邏輯.... 得到結(jié)果,這里省略....
$task_result = ......
// 發(fā)送結(jié)果
$connection->send(json_encode($task_result));
};
if(!defined('GLOBAL_START'))
{
Worker::runAll();
}
在workerman中調(diào)用
use \Workerman\Connection\AsyncTcpConnection;
....
// 與遠(yuǎn)程task服務(wù)建立異步鏈接,ip為遠(yuǎn)程task服務(wù)的ip,如果是本機(jī)就是127.0.0.1,如果是集群就是lvs的ip
$task_connection = new AsyncTcpConnection('Text://ip:12345');
// 任務(wù)及參數(shù)數(shù)據(jù)
$task_data = array(
'function' => 'send_mail',
'args' => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
);
// 發(fā)送數(shù)據(jù)
$task_connection->send(json_encode($task_data));
// 異步獲得結(jié)果
$task_connection->onMessage = function($task_connection, $task_result)
{
// 結(jié)果
var_dump($task_result);
// 獲得結(jié)果后記得關(guān)閉鏈接
$task_connection->close();
};
// 執(zhí)行異步鏈接
$task_connection->connect();
這樣,繁重的任務(wù)交給本機(jī)或者其它服務(wù)器的進(jìn)程去做,任務(wù)完成后會(huì)異步收到結(jié)果,BussinessWorker進(jìn)程就不會(huì)阻塞了
你好,我使用$task_connection->connect();報(bào)錯(cuò) Fatal error: Call to a member function add() on null;這個(gè)是什么原因
謝謝老大的精彩解答,我明白怎么做了。
還有個(gè)小小的疑問(wèn):高并發(fā)的狀態(tài)下,大量動(dòng)態(tài)new AsyncTcpConnection('Text://ip:12345');會(huì)不會(huì)性能影響比較大?
追加個(gè)問(wèn)題:如果業(yè)務(wù)處理時(shí)間比較長(zhǎng),需不需要在異步客戶(hù)端和處理進(jìn)程服務(wù)端兩端加心跳?分布式部署的方式
任務(wù)進(jìn)程服務(wù)端這個(gè)程序,假如開(kāi)了10個(gè)task進(jìn)程,每隔task進(jìn)程都在忙,是不是其它的請(qǐng)求就會(huì)被卡???
任務(wù)進(jìn)程服務(wù)端這個(gè)程序,假如開(kāi)了10個(gè)task進(jìn)程,每隔task進(jìn)程都在忙,是不是其它的請(qǐng)求就會(huì)被卡???
看情況
假設(shè)有10個(gè)task進(jìn)程處理耗時(shí)任務(wù)
假如workerman會(huì)收到 聊天 和 發(fā)郵件 兩種請(qǐng)求,發(fā)郵件 是耗時(shí)操作,如果在當(dāng)前進(jìn)程執(zhí)行,則會(huì)影響當(dāng)前進(jìn)程所有的后續(xù)請(qǐng)求,這時(shí)我們就轉(zhuǎn)到task進(jìn)程去異步去運(yùn)行,執(zhí)行完畢后異步通知客戶(hù)端。
而轉(zhuǎn)發(fā)聊天消息是非常快的,workerman框架本身異步非阻塞IO不會(huì)造成任何阻塞或者卡住進(jìn)程的情況,加上耗時(shí)的任務(wù)都交給了task進(jìn)程處理,所以聊天消息請(qǐng)求不會(huì)受到任何影響,即使task進(jìn)程全部在忙于發(fā)郵件
再看task進(jìn)程
如果瞬時(shí)來(lái)了10個(gè)發(fā)郵件請(qǐng)求,那么10個(gè)task進(jìn)程同時(shí)運(yùn)行5秒(假設(shè)每個(gè)發(fā)郵件請(qǐng)求執(zhí)行5秒)才能全部運(yùn)行完成,那么在運(yùn)行期間,如果再次收到發(fā)郵件請(qǐng)求,則這個(gè)請(qǐng)求會(huì)在task進(jìn)程排隊(duì),也就是等待約5秒后才能輪到這個(gè)發(fā)郵件的請(qǐng)求在task進(jìn)程執(zhí)行,也就是這個(gè)請(qǐng)求從接收到處理完畢耗時(shí)約10秒,但是這個(gè)是task進(jìn)程的阻塞,不會(huì)影響主流程的運(yùn)行,比如不影響聊天請(qǐng)求
如果task進(jìn)程運(yùn)行的任務(wù)繁重(長(zhǎng)時(shí)間阻塞),并且有不少的繁重任務(wù)請(qǐng)求量,則可以開(kāi)啟更多的task進(jìn)程處理,比如500個(gè)task進(jìn)程差不多可以每秒處理100個(gè)耗時(shí)5秒的請(qǐng)求,單臺(tái)服務(wù)器無(wú)法滿(mǎn)足時(shí)(通常是cpu 100%),可以增加服務(wù)器實(shí)現(xiàn)集群處理。