国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

playcatqueue的升級版之playcat/queue-webman

1.2.2 版本
2024-12-19 版本更新時間
63 安裝
2 star

簡介

之所以又重新做了一版是因為之前的版本還有一些功能沒實現(xiàn),但我又想能支持webman和tp+swoole,索性重構(gòu)了代碼。

  • 支持Redis單機或集群 (redis >= 5.0)
  • 支持Kafka
  • 支持RabbitMQ
  • 自定義異常與重試流程
  • (新)支持延遲消息數(shù)據(jù)持久化

安裝

composer require "playcat/queue-webman"

1.配置

1.1

編輯 config\plugin\playcat\queue\ 目錄下的app.phpprocess.php。修改相應內(nèi)容為自己環(huán)境使用的配置。

1.2 初始化數(shù)據(jù)庫(只需一次)

php webman timerserver:initdb

2.創(chuàng)建消費任務(wù)

新建一個php的文件并且添加以下內(nèi)容:

<?php

namespace app\queue\playcat;

use Playcat\Queue\Protocols\ConsumerDataInterface;
use Playcat\Queue\Protocols\ConsumerInterface;

class playcatConsumer1 implements ConsumerInterface
{
    //任務(wù)名稱,對應發(fā)布消息的名稱
    public $queue = 'playcatqueue';

    public function consume(ConsumerData $payload)
    {
        //獲取發(fā)布到隊列時傳入的內(nèi)容
        $data = $payload->getQueueData();
        ...你自己的業(yè)務(wù)邏輯
        //休息10s
        sleep(10);
        echo('done!');
    }
}

ConsumerData方法

  • getID: 當前消息的id
  • getRetryCount(): 當前任務(wù)已經(jīng)重試過的次數(shù)
  • getQueueData(): 當前任務(wù)傳入的參數(shù)
  • getChannel(): 當前所執(zhí)行的任務(wù)名稱
    • -

將上面編寫好的任務(wù)文件保存項目中'app/queue/playcat/'下(如果目錄不存在則自己手動創(chuàng)建)

啟動服務(wù):

啟動:
php start.php start

重載:可在不重啟服務(wù)的情況下更新業(yè)務(wù)
php start.php reload

停止:
php start.php stop

如果沒有錯誤出現(xiàn)則表示啟動完成

添加任務(wù)并且提交到隊列中

use Playcat\Queue\Manager;
use Playcat\Queue\Protocols\ProducerData;
//使用協(xié)程的方式,如果需要并行數(shù)據(jù)發(fā)布需要自行實現(xiàn)Manager的連接池
  //即時消費消息
  $payload = new ProducerData();
  //對應消費隊列里的任務(wù)名稱
  $payload->setChannel('test');
  //對應消費隊列里的任務(wù)使用的數(shù)據(jù)
  $payload->setQueueData([1,2,3,4]);
  //推入隊列并且獲取消息id
  $id = Manager::getInstance()->push($payload);

  //延遲消費消息
  $payload_delay = new ProducerData();
  $payload_delay->setChannel('test');
  $payload_delay->setQueueData([6,7,8,9]);
  //設(shè)置60秒后執(zhí)行的任務(wù)
  $payload_delay->setDelayTime(60);
  //推入隊列并且獲取消息id
  $id = Manager::getInstance()->push($payload_delay);
  //取消延遲消息
  Manager::getInstance()->del($id);

ProducerData方法

  • setChannel: 設(shè)置推入消息的隊列名稱
  • setQueueData: 設(shè)置傳入到消費任務(wù)的數(shù)據(jù)
  • setDelayTime: 設(shè)置延遲時間(秒)
    • -

異常與重試機制

任務(wù)在執(zhí)行過程中未拋出異常則默認執(zhí)行成功,否則則進入重試階段.
重試次數(shù)和時間由配置控制,重試間隔時間為當前重試次數(shù)的冪函數(shù)。
Playcat\Queue\Exceptions\DontRetry異常會忽略掉重試

其它

基于tp和swoole的隊列系統(tǒng)
playcat-queue-tpswoole

QQ:318274085

贊助商