国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

關(guān)于webman多次消費(fèi)rabbitmq fanout消息

calvin

問(wèn)題描述

現(xiàn)有個(gè)需求是需要把物聯(lián)網(wǎng)設(shè)備的狀態(tài)同步到兩個(gè)獨(dú)立的系統(tǒng)中(都是用webman開(kāi)發(fā))。計(jì)劃設(shè)備回來(lái)的消息扔到mq中,然后兩個(gè)系統(tǒng)去消費(fèi)處理各自的業(yè)務(wù),但發(fā)現(xiàn)消息被消費(fèi)了多次,單條消息消費(fèi)次數(shù)也不等于進(jìn)程數(shù)。

為此你搜索到了哪些方案及不適用的原因

我是在進(jìn)程啟動(dòng)后做的監(jiān)聽(tīng)操作,代碼如下:
截圖
app\bootstrap\SubscribeRabbitmq.php

    public static function start($worker)
    {
        $is_console = !$worker;
        if ($is_console) {
            // If you do not want to execute this in console, just return.
            return;
        }
        if ($worker->id === 0) {
            Log::channel('mq')->info('【初始化MQ】' . getmypid());
            //  連接 RabbitMQ 實(shí)例并創(chuàng)建頻道
            (new Client([
                'vhost'    => envs('MQ_VHOST', '/'),
                'user'     => envs('MQ_USERNAME', 'guest'),
                'password' => envs('MQ_PASSWORD', 'guest')
            ]))->connect()
                ->then(function (Client $client) {
                    return $client->channel();
                })->then(function (Channel $channel) {
                    //  聲明【設(shè)備狀態(tài)變更】交換機(jī)和匿名隊(duì)列
                    return $channel->exchangeDeclare('device.status.change', 'fanout')->then(function () use ($channel
                    ) {
                        return $channel->queueDeclare('', false, false, true, false);
                    })->then(function (MethodQueueDeclareOkFrame $frame) use ($channel) {
                        // 獲取綁定到隊(duì)列上的幀對(duì)象,并將該幀的隊(duì)列名傳遞給 $channel->queueBind() 函數(shù)來(lái)將隊(duì)列綁定到 【設(shè)備狀態(tài)變更】 交換機(jī)。
                        return $channel->queueBind($frame->queue, 'device.status.change')->then(function () use ($frame
                        ) {
                            return $frame;
                        });
                    })->then(function (MethodQueueDeclareOkFrame $frame) use ($channel) {
                        //  設(shè)置消息消費(fèi)函數(shù),設(shè)置 $channel->consume() 函數(shù),當(dāng)消費(fèi)者監(jiān)聽(tīng)一個(gè)虛擬主機(jī)和隊(duì)列時(shí),該函數(shù)將一直運(yùn)行,等待接收實(shí)例的消息
                        $channel->consume(
                            function (Message $message, Channel $channel, Client $client) {
                                Log::channel('mq')->info('【MQ-consume】收到一條待消費(fèi)數(shù)據(jù)進(jìn)程ID:' . getmypid(),
                                    ['message_content' => $message->content]);
                            },
                            $frame->queue,
                            '',
                            false,
                            true
                        );
                    });
                });
        }
    }

記錄的日志:
截圖

兩個(gè)疑問(wèn):
1.不是很理解為啥有些消息消費(fèi)了3次,有些2次,還有些消費(fèi)的進(jìn)程id和初始的不是同一個(gè)
2.我預(yù)期的是每個(gè)系統(tǒng)只要消費(fèi)一次,這樣的話不是應(yīng)該用自定義進(jìn)程?

1267 1 0
1個(gè)回答

智佳思遠(yuǎn)

自定義進(jìn)程。bootstrap是每個(gè)進(jìn)程都執(zhí)行,包括webman的http進(jìn)程和monitor進(jìn)程,雖然你設(shè)置了worker->id === 0,但是http 0號(hào)進(jìn)程和 monitor 0號(hào)進(jìn)程都參與消費(fèi)了,所以是兩個(gè)消費(fèi)進(jìn)程在消費(fèi)。至于重復(fù)消費(fèi),是不是沒(méi)有ack?或者生產(chǎn)者確實(shí)發(fā)布了多個(gè)重復(fù)的消息?

  • 8355 2023-04-17

    我猜是exchange和queue配置問(wèn)題

  • calvin 2023-04-17

    生產(chǎn)者重發(fā)發(fā)布這個(gè)可以排除。改成自定義進(jìn)程就沒(méi)問(wèn)題了 謝謝大佬們

年代過(guò)于久遠(yuǎn),無(wú)法發(fā)表回答
??