国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

Gatewayworker聊天項目中利用消息隊列Kafka解偶

jihan

問題描述

萌新,在聊天項目中,每次發(fā)送消息時都經(jīng)過消息隊列,想使用Kafka,但不知道如何和gatewayworker結合。邏輯是寫在EVENT的onMessage里,每次接收到消息往隊列里推,然后同時又接收,再調(diào)gateway的方法嗎?各位大佬能給個demo嗎?

1938 4 0
4個回答

taozywu
  1. 首先 邏輯是寫在EVENT的onMessage里 這是對的
  2. 然后在邏輯中可以通過某個字段(Eg. type)來區(qū)分是什么數(shù)據(jù)后,將數(shù)據(jù)入隊即可
  3. 在單獨寫一個消費腳本來處理Kafka的數(shù)據(jù)就行。

類似Code如下

<?php
require_once __DIR__ . './autoload.php';

use Workerman\Worker;

$consumer = new Worker();
$consumer->count = 8;

$consumer->onWorkerStart = function () {
   // @TODO Your code here...
   Worker::stopAll();
};

Worker::runAll();
  • jihan 2022-10-12

    感謝大佬,但是有個疑問就是,單獨寫一個消費腳本來處理Kafka的數(shù)據(jù)時需要用到gateway的方法,比如A發(fā)送消息給B,在消費時要調(diào)用gateay的方法發(fā)送消息給B. 這點如何在消費腳本里做到呢?

  • 晚安。 2022-10-12

    這樣數(shù)量大的時候,收發(fā)消息會有點延遲把

  • jihan 2022-10-12

    解偶了肯定會有延遲的,但消費方可以一直擴展

不行可以用redis-queue隊列也可以的

  • jihan 2022-10-12

    感謝大佬,但用那種應該都是一樣,只是在消費時如何調(diào)用gateway得方法發(fā)送消息?場景:A發(fā)送消息給B, 消息在隊列里消費時,如何發(fā)送給B?

  • muyu 2022-10-12

    單獨起一個進程,拉取中間件數(shù)據(jù)推送給響應的客戶端

  • jihan 2022-10-12

    emm, 具體的就是在這個進程里調(diào)用Gateway::sendToClient($client_id, $data) ?然后入隊列的時候把client_id 傳過來就行了嗎?我的疑問點就是在消費腳本里直接調(diào)用Gateway得方法不知道可不可行?

  • muyu 2022-10-12

    本身其實可以直接發(fā)給客戶端,但是你不是想削峰解耦嗎,所以入隊列把雙方client_id和必要信息都入棧,出棧的時候在根據(jù)雙方是否在線發(fā)送消息

  • jihan 2022-10-12

    懂了,感謝大佬~

muyu

Kafka不大適合聊天推送吧,消費后數(shù)據(jù)不清除

  • 暫無評論
taozywu

感謝大佬,但是有個疑問就是,單獨寫一個消費腳本來處理Kafka的數(shù)據(jù)時需要用到gateway的方法,比如A發(fā)送消息給B,在消費時要調(diào)用gateay的方法發(fā)送消息給B. 這點如何在消費腳本里做到呢?

TestConsumer.php

<?php
require_once __DIR__ . './autoload.php';

use Workerman\Worker;

$consumer = new Worker();
$consumer->count = 8;

$consumer->onWorkerStart = function () {
   // @TODO Your code here...
   // @TODO Your code here...

   //TestModule->dealData();

   Worker::stopAll();
};

Worker::runAll();

TestModule.php

GatewayWorker\Lib\Gateway 自行下載獨立文件

<?php

use GatewayWorker\Lib\Gateway;

function dealData() {
    // Eg.拿到Kafka的一條數(shù)據(jù) & json
    $data = json_decode($msg, true);

    if (!$data) {
        return false;
    }

    switch($data['type']) {
        case 'A_to_B':
            Gateway::sendToUid("B", json_encode($msg));
            break;
        case 'B_to_A':
            Gateway::sendToUid("A", json_encode($msg));
            break;
    }

}
  • jihan 2022-10-12

    get, 感謝大佬

  • taozywu 2022-10-12

    Kafka數(shù)據(jù)如果比較多,可以適當加大進程數(shù)

  • jihan 2022-10-12

    kk??

年代過于久遠,無法發(fā)表回答
??