我在使用該項(xiàng)目過(guò)程中,曾出現(xiàn)過(guò)timer無(wú)限制遞增的情況,也出現(xiàn)過(guò)服務(wù)端主動(dòng)踢出連接無(wú)法消費(fèi)等問(wèn)題,遂自己重寫了一個(gè)amqp客戶端;
后來(lái)我回過(guò)頭觀察分析workerman/rabbitmq源碼的時(shí)候,發(fā)現(xiàn)了一些可以被建議的地方:
Client.php 160 - 170 行位置已經(jīng)創(chuàng)建了一個(gè)持續(xù)的定時(shí)器
})->then(function () {
$this->heartbeatTimer = Timer::add($this->options["heartbeat"], [$this, "onHeartbeat"], null, true);
$this->state = ClientStateEnum::CONNECTED;
return $this;
});
Client.php 226 - 243 行位置依然在反復(fù)創(chuàng)建一次性的定時(shí)器
public function onHeartbeat()
{
$now = microtime(true);
$nextHeartbeat = ($this->lastWrite ?: $now) + $this->options["heartbeat"];
if ($now >= $nextHeartbeat) {
$this->writer->appendFrame(new HeartbeatFrame(), $this->writeBuffer);
$this->flushWriteBuffer()->done(function () {
$this->heartbeatTimer = Timer::add($this->options["heartbeat"], [$this, "onHeartbeat"], null, false);
});
if (is_callable($this->options['heartbeat_callback'] ?? null)) {
$this->options['heartbeat_callback']->call($this);
}
} else {
$this->heartbeatTimer = Timer::add($nextHeartbeat - $now, [$this, "onHeartbeat"], null, false);
}
}
這里我理解是為了判斷時(shí)間差,更精確的進(jìn)行觸發(fā)回調(diào),但是我個(gè)人認(rèn)為本身workerman的定時(shí)器是精確到毫秒的,這里沒(méi)有必要再做冗余的判斷,直接調(diào)用即可,代碼如下:
public function onHeartbeat(): void
{
$this->writer->appendFrame(new HeartbeatFrame(), $this->writeBuffer);
$this->flushWriteBuffer()->then(
function () {
if (is_callable(
isset($this->options['heartbeat_callback'])
? $this->options['heartbeat_callback']
: null
)) {
// 這里我并沒(méi)有沿用bunny的回調(diào)觸發(fā)方式,而是自己寫了一個(gè),如有需要,將該方法改回bunny方法就好了
($this->options['heartbeat_callback'])($this);
// $this->options['heartbeat_callback']->call($this);
}
},
function (\Throwable $throwable){
if($this->log){
$this->log->notice(
'OnHeartbeatFailed',
[
$throwable->getMessage(),
$throwable->getCode(),
$throwable->getFile(),
$throwable->getLine()
]
);
}
AbstractProcess::kill("OnHeartbeatFailed-{$throwable->getMessage()}");
});
}
AbstractProcess::kill()這個(gè)方法實(shí)際上是一個(gè)簡(jiǎn)單的殺死當(dāng)前進(jìn)程的方法,因?yàn)閎unny的客戶端使用的是異步promise的執(zhí)行方式,在遇到錯(cuò)誤的時(shí)候會(huì)調(diào)用then中的onRejected,我認(rèn)為,在一定情況下如果心跳失敗了,會(huì)影響當(dāng)前鏈接的活性,隨之會(huì)被服務(wù)端踢出,但客戶端并沒(méi)有完善的重連機(jī)制,就造成了假死,所以我在這個(gè)位置加入殺死當(dāng)前進(jìn)程的方法,讓workerman的主進(jìn)程重新拉起一個(gè)進(jìn)程,該進(jìn)程也會(huì)重新連接,重新處理和消費(fèi),不會(huì)影響工作流,kill方法的代碼如下:
public static function kill(?string $log = null)
{
if(self::$_masterPid === ($pid = posix_getpid())){
self::stopAll(SIGKILL, $log);
}else{
self::log("(pid:{$pid}) {$log}");
posix_kill($pid, SIGKILL);
}
}
如上述所說(shuō)的,在Client.php 182 - 220的disconnect方法中也沒(méi)有重連的方案,在使用rabbitmq的管理后臺(tái)將該鏈接斷開后,該進(jìn)程就始終保持了一個(gè)僵尸進(jìn)程的角色,無(wú)法退出也無(wú)法消費(fèi),所以我建議改進(jìn)如下:
public function disconnect($replyCode = 0, $replyText = '') : Promise\PromiseInterface
{
if ($this->state === ClientStateEnum::DISCONNECTING) {
return $this->disconnectPromise;
}
if ($this->state !== ClientStateEnum::CONNECTED) {
return Promise\reject(new ClientException("Client is not connected."));
}
$this->state = ClientStateEnum::DISCONNECTING;
$promises = [];
if ($replyCode === 0) {
foreach ($this->channels as $channel) {
$promises[] = $channel->close($replyCode, $replyText);
}
}
else{
foreach($this->channels as $channel){
$this->removeChannel($channel->getChannelId());
}
}
if ($this->heartbeatTimer) {
Timer::del($this->heartbeatTimer);
$this->heartbeatTimer = null;
}
return $this->disconnectPromise = Promise\all($promises)->then(function () use ($replyCode, $replyText) {
if (!empty($this->channels)) {
throw new \LogicException("All channels have to be closed by now.");
}
if($replyCode !== 0){
return null;
}
return $this->connectionClose($replyCode, $replyText, 0, 0);
})->then(function () use ($replyCode, $replyText){
$this->eventLoop->del($this->getStream(), EventInterface::EV_READ);
$this->closeStream();
$this->init();
if($replyCode !== 0){
// 殺死當(dāng)前進(jìn)程,交主進(jìn)程重啟
AbstractProcess::kill("{$replyCode}-{$replyText}");
}
return $this;
});
}
最后,我是非常喜歡workman及相關(guān)的生態(tài)組件的,本意是想直接使用workerman生態(tài)相關(guān)的組件,但當(dāng)時(shí)我所處的項(xiàng)目上線非常急迫,所以拋開了workerman/rabbitmq自行寫了一套casual/amqp,我個(gè)人希望workerman能夠越來(lái)越好,加油;
casual/amqp項(xiàng)目地址:https://github.com/chaz6chez/simple-amqp
非常感謝你的建議!
如果可以的話,請(qǐng)給 workerman/rabbitmq
發(fā)個(gè)pr。
業(yè)務(wù)代碼都是運(yùn)行在子進(jìn)程,所以不用判斷master_pid,workerman重啟當(dāng)前進(jìn)程直接調(diào)用 Worker::stopAll();
就行,也可以用 posix_kill(posix_getpid(), SIGINT);
我打算來(lái)長(zhǎng)期幫忙維護(hù)workerman/rabbitmq這個(gè)項(xiàng)目了,我把rabbitmq引入我的casual/amqp這個(gè)項(xiàng)目使用了,目前是在生產(chǎn)環(huán)境運(yùn)行的
接上述,如果Workerman能夠提供一個(gè)子進(jìn)程重啟的命令或者方法的話,是極好的,目前我是用AbstractProcess繼承Worker,然后增加的一個(gè)kill方法,但是也是使用了KILL的信號(hào),不知道有沒(méi)有其他更優(yōu)雅的信號(hào)或者方法可以使用呢?