国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

支持多種消息服務(wù)端的隊(duì)列應(yīng)用

2.2.0 版本
2023-04-25 版本更新時(shí)間
396 安裝
10 star

<h1 align="center">Playcat Queue</h1>

<p align="center">webman下的消息隊(duì)列服務(wù)</p>

特點(diǎn)

  1. 多種消息系統(tǒng)支持(Redis,Kafka)
  2. 高效的消息延遲和異常重試機(jī)制
  3. 支持大數(shù)據(jù)量處理場(chǎng)景

支持的消息系統(tǒng)

Redis Stream (已完成)

Redis Cluster Stream (已完成)

Kafka(已完成)

更多。。。

環(huán)境需求

  • PHP >= 7.2
  • webman >= 1.4
  • PHP Redis擴(kuò)展 (使用redis)
  • PHP RdKafka擴(kuò)展 (使用kafka)

安裝

$ composer require "playcat/queue"

使用方法

1.選擇自己的消息服務(wù)

  • 使用Redis Stream(默認(rèn))
    所用Redis的版本 >=5.0
    修改config\plugin\playcat\queue\redis.php為自己redis的配置即可

  • 使用Redis Cluster Stream
    所用Redis的版本 >=5.0并且配置好自己的集群環(huán)境
    編輯config\plugin\playcat\queue\manager.php修改里面的driver為如下內(nèi)容

'driver' => \Playcat\Queue\Driver\Rediscluster::class,

編輯config\plugin\playcat\queue\rediscluster.php,替換對(duì)應(yīng)的redis的配置即可

  • 使用Kafka

  • 創(chuàng)建Kafka的topic

./kaftopics.sh --create --bootstrap-server xxx:9092 --replication-factor 1 --partitions 1 --topic 任務(wù)名稱
  • 編輯config\plugin\playcat\queue\manager.php修改里面的driver為如下內(nèi)容
'driver' => \Playcat\Queue\Driver\Kafka::class,

編輯config\plugin\playcat\queue\Kafka.php,替換對(duì)應(yīng)的Kafka的配置

2.創(chuàng)建消費(fèi)者任務(wù)

新建一個(gè)名為'Test.php'文件添加以下內(nèi)容:

<?php

namespace app\queue\playcat;

use Playcat\Queue\Model\Payload;
use Playcat\Queue\Protocols\Consumer;

class Test implements Consumer
{
    //任務(wù)名稱
    public $queue = 'test';

    public function consume(Payload $payload)
    {
        //獲取自定義傳入的內(nèi)容
        $data = $payload->getQueueData();
        ...
    }
}

將'Test.php'保存到'

app/queue/playcat/'目錄下。如果目錄不存在就創(chuàng)建它(
==可以編輯config/plugin/playcat/queue/process.php中的consumer_dir的地址來改變==)

啟動(dòng)webman的服務(wù)

$ php start.php start

添加任務(wù)

use Playcat\Queue\Manager;
use Playcat\Queue\Model\Payload;
$payload = new Payload();
//對(duì)應(yīng)消費(fèi)隊(duì)列里的任務(wù)名稱
$payload->setChannel('test');
//對(duì)應(yīng)消費(fèi)隊(duì)列里的任務(wù)使用的數(shù)據(jù)
$payload->setQueueData([1,2,3,4]);
//創(chuàng)建一個(gè)立即執(zhí)行的任務(wù)
Manager::getInstance()->push($payload);

$payload_delay = new Payload();
//對(duì)應(yīng)消費(fèi)隊(duì)列里的任務(wù)名稱
$payload_delay->setChannel('test');
//對(duì)應(yīng)消費(fèi)隊(duì)列里的任務(wù)使用的數(shù)據(jù)
$payload_delay->setQueueData([6,7,8,9]);
//設(shè)置60秒后執(zhí)行的任務(wù)
$payload_delay->setDelayTime(60);
Manager::getInstance()->push($payload_delay);`

異常與重試機(jī)制

任務(wù)在執(zhí)行過程中未拋出異常則默認(rèn)執(zhí)行成功,否則則進(jìn)入重試階段.
重試次數(shù)和時(shí)間由配置控制,重試間隔時(shí)間為當(dāng)前重試次數(shù)的冪函數(shù)。
Playcat\Queue\Exceptions\DontRetry異常會(huì)忽略掉重試

Playcat\Queue\Model\Payload

  • getID: 當(dāng)前任務(wù)的唯一id
  • getRetryCount(): 當(dāng)前任務(wù)已經(jīng)重試過的次數(shù)
  • getQueueData(): 當(dāng)前任務(wù)傳入的參數(shù)
  • getChannel(): 當(dāng)前所執(zhí)行的任務(wù)名稱

License

MIT

贊助商