public static function onWorkerStart($worker)
{
Timer::add(5, function () {
if(empty(self::$heartbeat_queue)){
return;
}
StoreWs::where("id", 'in', self::$heartbeat_queue)->update(['heartbeat_time' => time()]);
self::$heartbeat_queue = [];
});
}
可這樣子嵌套 ?
Timer::add($worker->id+1, function () {
Timer::add(5, function () {
if(empty(self::$heartbeat_queue)){
return;
}
StoreWs::where("id", 'in', self::$heartbeat_queue)->update(['heartbeat_time' => time()]);
self::$heartbeat_queue = [];
});
},[],false);
估計(jì)你是想任務(wù)在不同進(jìn)程不同時(shí)執(zhí)行吧,我做過類似的,把任務(wù)按順序分配到不同的進(jìn)程。
$threadTotal = $worker->count; //總進(jìn)程數(shù)量 n
$threadId = $worker->id; //當(dāng)前進(jìn)程編號 1 ~ (n-1)
$i = -1;
foreach ($taskList as $li) {
$i += 1;
if ($i % $threadTotal != $threadId) {
continue;
}
$this->curl($li['url']);
}
$queueIds = [];
foreach (self::$heartbeat_queue as $i => $q) {
if ($i % $threadTotal != $threadId) {
continue;
}
$queueIds[]= $q;
unset(self::$heartbeat_queue[$i]);
}
StoreWs::where("id", 'in', $queueIds)->update(['heartbeat_time' => time()]);
同時(shí)執(zhí)行,但不同的進(jìn)程不會對同一個(gè)queueId重復(fù)執(zhí)行。