開(kāi)啟10個(gè)進(jìn)程,每個(gè)進(jìn)程執(zhí)行不同的任務(wù)1、2、3、……10,然后主線程和子進(jìn)程通訊分別交互不同的信息,怎么實(shí)現(xiàn)?
子進(jìn)程A:--> 運(yùn)行10秒計(jì)算任務(wù) --> 和主進(jìn)程通信 --> 運(yùn)行10秒計(jì)算任務(wù) --> 和主進(jìn)程通信 -->……
子進(jìn)程B:--> 運(yùn)行10秒計(jì)算任務(wù) --> 和主進(jìn)程通信 --> 運(yùn)行10秒計(jì)算任務(wù) --> 和主進(jìn)程通信 -->……
子進(jìn)程C:--> 運(yùn)行10秒計(jì)算任務(wù) --> 和主進(jìn)程通信 --> 運(yùn)行10秒計(jì)算任務(wù) --> 和主進(jìn)程通信 -->……
主進(jìn)程和需要先和A通信,然后和B,然后和C,……有先后順序
你這種工作邏輯類似于流水線;
可以開(kāi)N個(gè)消費(fèi)者,通過(guò)隊(duì)列來(lái)實(shí)現(xiàn)你要的功能;
工作流:1隊(duì)列 --->>> 2隊(duì)列 --->>> 3隊(duì)列 --->>> 4隊(duì)列 --->>> 5隊(duì)列 --->>> 6隊(duì)列
沒(méi)這么簡(jiǎn)單。
每個(gè)消費(fèi)者都需要運(yùn)行10秒鐘cpu繁重的任務(wù),然后通信,然后繼續(xù)運(yùn)行繁重任務(wù),然后繼續(xù)通訊,循環(huán)
然后不同的消費(fèi)者通信又有先后順序,怎么實(shí)現(xiàn)?
以TaskProcess 類為例,這個(gè)類只做2個(gè)事情
按你的流水線,在進(jìn)程配置內(nèi)新增進(jìn)程配置(一定要設(shè)置構(gòu)造函數(shù) 參數(shù))
完畢。
進(jìn)程配置
'pipeline_1' => [
'count' => 1,
'handler' => \process\TaskProcess::class,
'constructor' => [
'type' => '任務(wù)類型1',
'queue' => '隊(duì)列1'
],
],
'pipeline_2' => [
'count' => 5,
// 根據(jù)任務(wù)繁重情況設(shè)置進(jìn)程數(shù)
'handler' => \process\TaskProcess::class,
'constructor' => [
'type' => '任務(wù)類型2',
'queue' => '隊(duì)列2'
],
],
'pipeline_3' => [
'count' => 3,
// 根據(jù)任務(wù)繁重情況設(shè)置進(jìn)程數(shù)
'handler' => \process\TaskProcess::class,
'constructor' => [
'type' => '任務(wù)類型3',
'queue' => '隊(duì)列3'
],
],
<?php
namespace process;
use support\Redis;
use Workerman\Timer;
class TaskProcess
{
/**
* @var string
*/
protected string $type;
/**
* @var string
*/
protected string $queue;
/**
* 構(gòu)造函數(shù)
* @param string $type
* @param string $queue
*/
public function __construct(string $type, string $queue)
{
$this->type = $type;
$this->queue = $queue;
}
/**
* 根據(jù)類型,得到下一個(gè)工作流隊(duì)列名稱
* @return string
*/
public function getNextQueue(): string
{
//todo...
switch ($this->type) {
case '':
return '';
default:
return '';
}
}
/**
* 進(jìn)程啟動(dòng)時(shí)執(zhí)行
* @return void
*/
public function onWorkerStart(): void
{
Timer::add(1, function () {
//當(dāng)前工作流隊(duì)列內(nèi)有任務(wù)
if (Redis::lLen($this->queue)) {
$data = Redis::lPop($this->queue);
//todo... 完成當(dāng)前流程,得到結(jié)果
$result = call_user_func([$this, $this->type], $data);
//todo... 投遞到下個(gè)工作流隊(duì)列
Redis::rPush($this->getNextQueue(), $result);
}
});
}
/**
* step1
* @param $data
* @return void
*/
protected function step1($data): void
{
//todo... 業(yè)務(wù)邏輯
}
/**
* step2
* @param $data
* @return void
*/
protected function step2($data): void
{
//todo... 業(yè)務(wù)邏輯
}
/**
* step3
* @param $data
* @return void
*/
protected function step3($data): void
{
//todo... 業(yè)務(wù)邏輯
}
/**
* step4
* @param $data
* @return void
*/
protected function step4($data): void
{
//todo... 業(yè)務(wù)邏輯
}
}
$data = Redis::lPop($this->queue);并不會(huì)阻塞啊,要等待數(shù)據(jù)過(guò)來(lái),到了就立馬執(zhí)行
而且用redis獲取數(shù)據(jù)時(shí)多個(gè)進(jìn)程間會(huì)相互爭(zhēng)奪數(shù)據(jù),而需求是進(jìn)程1 的數(shù)據(jù)傳給進(jìn)程2,進(jìn)程2 的傳給進(jìn)程3……,有順序的
回復(fù)1:Redis::lPop($this->queue);是定時(shí)器拉起執(zhí)行的,執(zhí)行處理任務(wù)的過(guò)程中就是阻塞的。
回復(fù)2:redis獲取時(shí)多進(jìn)程會(huì)相互爭(zhēng)奪數(shù)據(jù),納尼?實(shí)例化進(jìn)程對(duì)象的時(shí)候,隊(duì)列名都不一樣,爭(zhēng)奪屁的數(shù)據(jù)??
回復(fù)3:核心邏輯是各個(gè)任務(wù)進(jìn)程并行處理任務(wù);用redis的列表(隊(duì)列)實(shí)現(xiàn)了n個(gè)任務(wù)進(jìn)程通信,并行處理任務(wù),不理解嗎?(實(shí)例化的時(shí)候,每個(gè)進(jìn)程監(jiān)聽(tīng)的隊(duì)列名字不是同一個(gè),理解不了嗎)
理解。
回復(fù)1:靠定時(shí)器拉起,中間會(huì)有浪費(fèi)的時(shí)間,64個(gè)進(jìn)程一輪下來(lái)就浪費(fèi)64秒,有沒(méi)有實(shí)時(shí)響應(yīng)的辦法?
丟包報(bào)錯(cuò)的原因是你執(zhí)行過(guò)程中是堵塞的,進(jìn)程通信組件發(fā)過(guò)去,也收不到;
所以,while(1){
usleep(10000);
//todo... 從當(dāng)前隊(duì)列內(nèi)拿上個(gè)結(jié)果,處理后丟進(jìn)下個(gè)進(jìn)程隊(duì)列。。。
}
需要阻塞,數(shù)據(jù)來(lái)了立馬執(zhí)行立馬傳給下一個(gè)進(jìn)程,分散到多個(gè)進(jìn)程的目的就是為了最大化減少運(yùn)行時(shí)間。
子進(jìn)程A:--> 運(yùn)行10秒計(jì)算任務(wù) --> 等待數(shù)據(jù)1,收到后馬上處理數(shù)據(jù)然后返回?cái)?shù)據(jù)2 --> 運(yùn)行10秒計(jì)算任務(wù) --> 和主進(jìn)程通信 -->……
子進(jìn)程B:--> 運(yùn)行10秒計(jì)算任務(wù) --> 等待數(shù)據(jù)2,處理成數(shù)據(jù)3返回 --> 運(yùn)行10秒計(jì)算任務(wù) --> 和主進(jìn)程通信 -->……
子進(jìn)程C:--> 運(yùn)行10秒計(jì)算任務(wù) --> 等待數(shù)據(jù)3,處理成數(shù)據(jù)4返回 --> 運(yùn)行10秒計(jì)算任務(wù) --> 和主進(jìn)程通信 -->……
最終的目的是要分散到64個(gè)進(jìn)程同時(shí)計(jì)算
A進(jìn)程(pipeline_1進(jìn)程)等待隊(duì)列1的任務(wù),定時(shí)器每秒檢查【隊(duì)列1】?有則執(zhí)行,結(jié)果傳遞到【隊(duì)列2】
B進(jìn)程(pipeline_2進(jìn)程)等待隊(duì)列2的任務(wù),定時(shí)器每秒檢查【隊(duì)列2】?有則執(zhí)行,結(jié)果傳遞到【隊(duì)列3】
C進(jìn)程(pipeline_3進(jìn)程)等待隊(duì)列3的任務(wù),定時(shí)器每秒檢查【隊(duì)列3】?有則執(zhí)行,結(jié)果傳遞到【隊(duì)列4】
……
X進(jìn)程(pipeline_X進(jìn)程)收到結(jié)果,入庫(kù) 或者 進(jìn)行下一輪的調(diào)度;
進(jìn)程間通信用Redis的列表,這么難理解嗎?
主進(jìn)程,子進(jìn)程。
你應(yīng)該是沒(méi)看完手冊(cè),
webman,workerman 你知道主進(jìn)程是干嘛用的嗎,
你開(kāi)的每一個(gè)進(jìn)程都是平級(jí)的,workerman,webman的主進(jìn)程用來(lái)
for ($process_sub>= 你設(shè)置的進(jìn)程數(shù)){
創(chuàng)建一個(gè)子進(jìn)程
}
workerman,webman的主進(jìn)程主要是用來(lái)supervise用的,你還想主進(jìn)程做啥呢。
你要是希望一個(gè)一個(gè)順序處理,應(yīng)該用到隊(duì)列
你這樣的想法,實(shí)際上是線程模型的思想,線程模型的思想主要是讓主線程分配,子線程執(zhí)行,主線程回收數(shù)據(jù);
進(jìn)程模型一般不會(huì)有這樣的思路,通常來(lái)說(shuō)不會(huì)通過(guò)主進(jìn)程來(lái)進(jìn)行任務(wù)的分配,而是通過(guò)一些共同特征,讓子進(jìn)程自己通過(guò)特征來(lái)獲取自己應(yīng)該獲取的任務(wù),而主進(jìn)程只負(fù)責(zé)一件事情,就是子進(jìn)程的正常運(yùn)行和整體服務(wù)的正常運(yùn)行;
如果你想要強(qiáng)行實(shí)現(xiàn)主進(jìn)程分配任務(wù)讓固定子進(jìn)程進(jìn)行處理,我建議每一個(gè)子進(jìn)程都與主進(jìn)程分別建立一條專屬的channel,然后通過(guò)發(fā)布訂閱的方式進(jìn)行業(yè)務(wù)處理
進(jìn)程很難在同一時(shí)間并行處理,線程也如此,因?yàn)橐慌_(tái)服務(wù)器上運(yùn)行的進(jìn)程有很多,每個(gè)進(jìn)程都會(huì)有自己主動(dòng)和被動(dòng)出讓cpu執(zhí)行時(shí)間的間歇,同時(shí)并行的數(shù)量和cpu數(shù)量是相同的;
進(jìn)程間通訊如果使用socket,那么就存在數(shù)據(jù)需要在用戶態(tài)和內(nèi)核態(tài)進(jìn)行拷貝操作,其實(shí)效率沒(méi)有那么高;
如果在共享內(nèi)存中,操作會(huì)被加鎖,可能會(huì)存在互斥;
把各個(gè)業(yè)務(wù)拆分成A\B\C\D來(lái)分別執(zhí)行,在理論上可能效率很高,但在實(shí)際的操作過(guò)程中存在上述甚至更多的影響效率的點(diǎn),所以還不如將業(yè)務(wù)放在一個(gè)進(jìn)程中執(zhí)行,比如假設(shè)一個(gè)延遲并不高的業(yè)務(wù),通過(guò)拆分到不同進(jìn)程執(zhí)行,最后的結(jié)果只可能比單進(jìn)程慢,而不會(huì)快;
假設(shè)存在慢業(yè)務(wù),完全可以做成生產(chǎn)消費(fèi)模式,將存在大量ip的或者阻塞時(shí)間較長(zhǎng)的交給消費(fèi)隊(duì)列來(lái)執(zhí)行