国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

workerman/redis-queue

基于Redis的消息隊(duì)列,支持消息延遲處理。

項(xiàng)目地址:

https://github.com/walkor/redis-queue

安裝:

composer require workerman/redis-queue

示例

<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->onWorkerStart = function () {
    $client = new Client('redis://127.0.0.1:6379');
    // 訂閱
    $client->subscribe('user-1', function($data){
        echo "user-1\n";
        var_export($data);
    });
    // 訂閱
    $client->subscribe('user-2', function($data){
        echo "user-2\n";
        var_export($data);
    });
    // 消費(fèi)失敗觸發(fā)的回調(diào)(可選)
    $client->onConsumeFailure(function (\Throwable $exception, $package) {
        echo "隊(duì)列 " . $package['queue'] . " 消費(fèi)失敗\n";
        echo $exception->getMessage(), "\n";
        var_export($package);
    });
    // 定時(shí)向隊(duì)列發(fā)送消息
    Timer::add(1, function()use($client){
        $client->send('user-1', ['some', 'data']);
    });
};

Worker::runAll();

API


__construct (string $address, [array $options])

創(chuàng)建實(shí)例

  • $address 類似 redis://ip:6379,必須以redis開(kāi)頭.

  • $options 包括以下選項(xiàng):

    • auth: 鑒權(quán)信息,默認(rèn) ''
    • db: db,默認(rèn) 0
    • max_attempts: 消費(fèi)失敗后重試次數(shù),默認(rèn)5
    • retry_seconds: 重試時(shí)間間隔,單位秒。默認(rèn)5

消費(fèi)失敗是指業(yè)務(wù)拋出異常Exception或者Error。消費(fèi)失敗后消息會(huì)放到延遲隊(duì)列等待重試,重試次數(shù)由 max_attempts控制,重試間隔由retry_secondsmax_attempts共同控制。比如max_attempts為5,retry_seconds為10,第1次重試間隔為1*10秒,第2次重試時(shí)間間隔為2*10秒,第3次重試時(shí)間間隔為3*10秒,以此類推直到重試5次。如果超過(guò)了max_attempts設(shè)置測(cè)重試次數(shù),則消息放入key為{redis-queue}-failed(1.0.5版本之前為redis-queue-failed)的失敗隊(duì)列


send(String $queue, Mixed $data, [int $dely=0])

向隊(duì)列發(fā)送一條消息

  • $queue 隊(duì)列名, String 類型
  • $data 發(fā)布的具體消息,可以是數(shù)組或者字符串,Mixed 類型
  • $dely 延遲消費(fèi)時(shí)間,單位秒,默認(rèn)0, Int 類型

subscribe(mixed $queue, callable $callback)

訂閱一個(gè)隊(duì)列或者多個(gè)隊(duì)列

  • $queue 隊(duì)列名,可以是字符串或者包含多個(gè)隊(duì)列名的數(shù)組
  • $callback 回調(diào)函數(shù),格式為 function (Mixed $data),其中$data就是send($queue, $data)中的$data.

unsubscribe(mixed $queue)

取消訂閱

  • $queue 隊(duì)列名或者包含多個(gè)隊(duì)列名的數(shù)組

onConsumeFailure(callable $callback)

消費(fèi)失敗時(shí)觸發(fā)

  • $callback - function (\Throwable $exception, array $package), $package是隊(duì)列內(nèi)部數(shù)據(jù)結(jié)構(gòu),包含了data queue attempts max_attempts等信息

支持更改內(nèi)部數(shù)據(jù)結(jié)構(gòu)$package的值,只需要將更改后的$package return 即可。例如當(dāng)某個(gè)消息放生某種錯(cuò)誤時(shí)不希望再次重試,代碼類似

$client->onConsumeFailure(function (\Throwable $exception, $package) {
    echo "失敗的隊(duì)列名:" . $package['queue'] . "\n";
    // 當(dāng)隊(duì)列發(fā)生某種致命錯(cuò)誤時(shí)
    if ($exception->getMessage() === 'Some Fatal error') {
        // 將此消息的最大重試次數(shù)設(shè)置為0
        $package['max_attempts'] = 0;
    }
    // 返回修改后的`$package`數(shù)據(jù)結(jié)構(gòu)
    return $package;
});

在非workerman環(huán)境向隊(duì)列發(fā)送消息

有時(shí)候一些項(xiàng)目運(yùn)行在apache或者php-fpm環(huán)境,無(wú)法使用workerman/redis-queue項(xiàng)目,可以參考如下函數(shù)實(shí)現(xiàn)發(fā)送

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting'; //1.0.5版本之前為redis-queue-waiting
    $queue_delay = '{redis-queue}-delayed';//1.0.5版本之前為redis-queue-delayed

    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

其中,參數(shù)$redis為redis實(shí)例。例如redis擴(kuò)展用法類似如下:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);
編輯于2024-03-13 17:37:19 完善本頁(yè) +發(fā)起討論
贊助商
×