現(xiàn)有個(gè)需求是需要把物聯(lián)網(wǎng)設(shè)備的狀態(tài)同步到兩個(gè)獨(dú)立的系統(tǒng)中(都是用webman開(kāi)發(fā))。計(jì)劃設(shè)備回來(lái)的消息扔到mq中,然后兩個(gè)系統(tǒng)去消費(fèi)處理各自的業(yè)務(wù),但發(fā)現(xiàn)消息被消費(fèi)了多次,單條消息消費(fèi)次數(shù)也不等于進(jìn)程數(shù)。
我是在進(jìn)程啟動(dòng)后做的監(jiān)聽(tīng)操作,代碼如下:
app\bootstrap\SubscribeRabbitmq.php
public static function start($worker)
{
$is_console = !$worker;
if ($is_console) {
// If you do not want to execute this in console, just return.
return;
}
if ($worker->id === 0) {
Log::channel('mq')->info('【初始化MQ】' . getmypid());
// 連接 RabbitMQ 實(shí)例并創(chuàng)建頻道
(new Client([
'vhost' => envs('MQ_VHOST', '/'),
'user' => envs('MQ_USERNAME', 'guest'),
'password' => envs('MQ_PASSWORD', 'guest')
]))->connect()
->then(function (Client $client) {
return $client->channel();
})->then(function (Channel $channel) {
// 聲明【設(shè)備狀態(tài)變更】交換機(jī)和匿名隊(duì)列
return $channel->exchangeDeclare('device.status.change', 'fanout')->then(function () use ($channel
) {
return $channel->queueDeclare('', false, false, true, false);
})->then(function (MethodQueueDeclareOkFrame $frame) use ($channel) {
// 獲取綁定到隊(duì)列上的幀對(duì)象,并將該幀的隊(duì)列名傳遞給 $channel->queueBind() 函數(shù)來(lái)將隊(duì)列綁定到 【設(shè)備狀態(tài)變更】 交換機(jī)。
return $channel->queueBind($frame->queue, 'device.status.change')->then(function () use ($frame
) {
return $frame;
});
})->then(function (MethodQueueDeclareOkFrame $frame) use ($channel) {
// 設(shè)置消息消費(fèi)函數(shù),設(shè)置 $channel->consume() 函數(shù),當(dāng)消費(fèi)者監(jiān)聽(tīng)一個(gè)虛擬主機(jī)和隊(duì)列時(shí),該函數(shù)將一直運(yùn)行,等待接收實(shí)例的消息
$channel->consume(
function (Message $message, Channel $channel, Client $client) {
Log::channel('mq')->info('【MQ-consume】收到一條待消費(fèi)數(shù)據(jù)進(jìn)程ID:' . getmypid(),
['message_content' => $message->content]);
},
$frame->queue,
'',
false,
true
);
});
});
}
}
記錄的日志:
兩個(gè)疑問(wèn):
1.不是很理解為啥有些消息消費(fèi)了3次,有些2次,還有些消費(fèi)的進(jìn)程id和初始的不是同一個(gè)
2.我預(yù)期的是每個(gè)系統(tǒng)只要消費(fèi)一次,這樣的話不是應(yīng)該用自定義進(jìn)程?
自定義進(jìn)程。bootstrap是每個(gè)進(jìn)程都執(zhí)行,包括webman的http進(jìn)程和monitor進(jìn)程,雖然你設(shè)置了worker->id === 0,但是http 0號(hào)進(jìn)程和 monitor 0號(hào)進(jìn)程都參與消費(fèi)了,所以是兩個(gè)消費(fèi)進(jìn)程在消費(fèi)。至于重復(fù)消費(fèi),是不是沒(méi)有ack?或者生產(chǎn)者確實(shí)發(fā)布了多個(gè)重復(fù)的消息?