RabbitMQ 斷開后,主動 Worker::stopAll 是出于什么考慮呢?
我想實(shí)現(xiàn)服務(wù)自動恢復(fù),如果RabbitMQ斷開之后,會定時重連。我的服務(wù)在onWorkerStart時比較耗時。
return $this->disconnectPromise = Promise\all($promises)->then(function () use ($replyCode, $replyText) {
if (!empty($this->channels)) {
throw new \LogicException("All channels have to be closed by now.");
}
if($replyCode !== 0){
return null;
}
return $this->connectionClose($replyCode, $replyText, 0, 0);
})->then(function () use ($replyCode, $replyText){
$this->eventLoop->del($this->getStream(), EventInterface::EV_READ);
$this->closeStream();
$this->init();
if($replyCode !== 0){
Worker::stopAll(0,"RabbitMQ client disconnected: [{$replyCode}] {$replyText}");
}
return $this;
});
輸出為:
RabbitMQ client disconnected: [506] Connection closed by server unexpectedly
子進(jìn)程stopAll就會退出當(dāng)前子進(jìn)程,主進(jìn)程會重新拉起子進(jìn)程,在重新拉起子進(jìn)程的過程中,子進(jìn)程會對rabbitmq進(jìn)行重連,這是最簡單的重連機(jī)制的實(shí)現(xiàn)方案;當(dāng)然還有的做法是可以不用重啟進(jìn)程,在進(jìn)程內(nèi)部用代碼實(shí)現(xiàn)重連,代碼復(fù)雜度會更高,相對這種方案,維護(hù)起來也不方便,兩種方案比較起來也沒有明顯的性能差異,所以選擇一種代碼維護(hù)方便的即可。
之前重啟方案是我提交的PR,你也可以按照以下代碼進(jìn)行修改
return $this->disconnectPromise = Promise\all($promises)->then(function () use ($replyCode, $replyText) {
if (!empty($this->channels)) {
throw new \LogicException("All channels have to be closed by now.");
}
if($replyCode !== 0){
return null;
}
return $this->connectionClose($replyCode, $replyText, 0, 0);
})->then(function () use ($replyCode, $replyText){
$this->eventLoop->del($this->getStream(), EventInterface::EV_READ);
$this->closeStream();
$this->init();
if($replyCode !== 0) {
//定時器執(zhí)行stopAll
//在配置中增加一個重啟定時器字段,比如restart_time
if (($restartTime = $this->options['restart_time'] ?? 0) > 0) {
$this->eventLoop->add(restartTime, EventInterface::EV_TIMER, function() use ($replyCode, $replyText) {
Worker::stopAll(0,"RabbitMQ client disconnected: [{$replyCode}] {$replyText}");
});
return null;
} else {
Worker::stopAll(0,"RabbitMQ client disconnected: [{$replyCode}] {$replyText}");
}
}
return $this;
});
如果你有空,你可以提交這個pr,如果暫時沒空,等會兒我去提交個pr
我自己實(shí)現(xiàn)的重連方案沒并有使用 worker::stopAll, 我是希望不重啟子進(jìn)程。 我現(xiàn)在想重寫Client ,去掉Woker::stopAll(), 之后出現(xiàn)這樣的錯誤:
Fatal error: Uncaught Bunny\Exception\ClientException: Broken pipe or closed connection. in /home/vagrant/code/wanlay-scheduler/vendor/bunny/bunny/src/Bunny/AbstractClient.php:311
Stack trace:
return $this->disconnectPromise = Promise\all($promises)->then(function () use ($replyCode, $replyText) {
if (!empty($this->channels)) {
throw new \LogicException("All channels have to be closed by now.");
}
if($replyCode !== 0){
return null;
}
return $this->connectionClose($replyCode, $replyText, 0, 0);
})->then(function () use ($replyCode, $replyText){
$this->eventLoop->del($this->getStream(), EventInterface::EV_READ);
$this->closeStream();
$this->init();
if($replyCode !== 0){
Log::error("RabbitMQ client disconnected: [{$replyCode}] {$replyText}");
}
return $this;
});
不建議自行實(shí)現(xiàn)重啟功能,本身客戶端高度依賴eventloop,很多準(zhǔn)備工作是在進(jìn)程啟動的時候做的,比如加入定時器、加入事件監(jiān)聽,如果自行實(shí)現(xiàn)重啟,需要對這些工作都要重新再做一遍
你上述方式這樣寫,僅僅只是關(guān)閉了流的監(jiān)聽關(guān)閉了心跳定時器移除了緩沖區(qū),但是你重連的時候,你還需要把流的監(jiān)聽事件加上,并且為rabbitmq-client的連接進(jìn)行重新建立,重新獲取client-id等等
謝謝大佬 !太給力了, 我最開始的思路是想,如果斷開了連接,通過定時器去嘗試連接。 連接成功后覆蓋之前的連接對象。
目前發(fā)送MQ僅僅是服務(wù)的一部分,直接重啟我覺得影響了我其他的工作。
謝謝大佬 !太給力了, 我最開始的思路是想,如果斷開了連接,通過定時器去嘗試連接。 連接成功后覆蓋之前的> 連接對象。
目前發(fā)送MQ僅僅是服務(wù)的一部分,直接重啟我覺得影響了我其他的工作。
那你可以參考這個client是如何進(jìn)行連接的,然后在你的定時器里把連接工作做完,然后重連這里就按你自己寫的方式寫;當(dāng)然我個人覺得重啟當(dāng)前進(jìn)程是最快的做法,絲毫不會影響其他的工作。
這個我再做一個特性吧,就在重連這個地方增加一個注冊的回調(diào)事件,如果加入了注冊回調(diào)事件,就執(zhí)行回調(diào)事件,如果沒有回調(diào)事件,就自動重啟
@walkor 老大,根據(jù)需求,新增了用戶自定義注冊回調(diào);我又調(diào)整了一個pr,已提交
https://github.com/walkor/rabbitmq/pull/15
@mo 你可以參考https://github.com/walkor/rabbitmq/pull/15的方式,在回調(diào)里面得到傳入的client實(shí)例,然后對client實(shí)例進(jìn)行處理銷毀,重新new一個client代替你之前使用的client實(shí)例即可