隨著微服務(wù)的流行, 服務(wù)之間的調(diào)用變得越來越頻繁, 傳統(tǒng)的同步阻塞模式已經(jīng)無法滿足需求,
協(xié)程編程逐漸成為PHP
開發(fā)者的熱門話題, 在引入Fiber
之后, PHP早已支持原生的協(xié)程編程
workerman
是一款高性能的Worker
網(wǎng)絡(luò)服務(wù)框架, 其異步模型幾乎是callback
的方式,
在面對更多復(fù)雜場景時, 我們可能需要同步非堵塞
的方式來編寫代碼, 以便更好地管理服務(wù)之間的調(diào)用
ripple
是一個基于Fiber
實現(xiàn)的協(xié)程引擎,同時提供了workerman
版驅(qū)動,
使得workerman
可以使用ripple
協(xié)程引擎的各種特性
Workerman版本 | 支持狀態(tài) |
---|---|
4.1.x | 長期支持 |
5.0.x | 長期支持 |
截至目前
2024-11-04
, 該引擎處于公測階段, 因此你可能需要配置以確保能夠安裝beta版本
編輯
composer.json
的根節(jié)點
末尾添加
"minimum-stability": "beta",
"prefer-stable": true
composer require cloudtay/workerman-ripple
use Workerman\Worker;
Worker::$eventLoopClass = Workerman\Ripple\Driver::class;
Worker::runAll();
編輯
server.php
return [
'event_loop' => Workerman\Ripple\Driver::class,
// other configurations
];
workerman-ripple
內(nèi)置了幾個常用的組件,下面將以這些組件舉例
此前我們已經(jīng)分享了如何在
workerman
中使用GuzzleHttp
實現(xiàn)異步請求
《「分享創(chuàng)造」在Workerman中使用GuzzleHttp協(xié)程版實現(xiàn)無感異步請求》
下面我將以異步調(diào)用通義千問API
為例,舉例如何使用ripple
解決SSE
場景所遇到的挑戰(zhàn)
use Ripple\Http\Client\Capture\ServerSentEvents;
use Ripple\Http\Guzzle;
/**
* @var \GuzzleHttp\Client $client
*/
# 參照文檔構(gòu)建請求頭和請求體
$token = '輸入你的token';
$header = [
'Content-Type' => 'application/json',
'Authorization' => "Bearer {$token}",
'Accept' => 'text/event-stream',
'X-DashScope-SSE' => 'enable'
];
$body = [
'model' => 'qwen-max',
'input' => [
'model' => 'qwen-max',
'messages' => [
[
'role' => 'system',
'content' => '你是ripple協(xié)程引擎輔導(dǎo)員',
],
[
'role' => 'user',
'content' => '你好',
]
]
],
];
# 創(chuàng)建捕獲器
$capture = new ServerSentEvents();
# 設(shè)置回調(diào)處理器(傳統(tǒng)模式)
$capture->onEvent = static function (array|null $event) {
// TODO: 處理事件
};
$capture->onComplete = static function () {
// TODO: 請求結(jié)束
};
# 創(chuàng)建協(xié)程發(fā)送請求
\Co\async(static function()use($header,$body,$capture){
$client = Guzzle::newClient();
$guzzle->post('https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation', [
'headers' => $header,
'json' => $body,
'capture' => $capture
]);
});
# 同步非堵塞模式
foreach ($capture->getIterator() as $event) {
echo json_encode($event) , PHP_EOL;
}
\Co\wait(); // 使用workerman或webman時無需調(diào)用該方法
運行結(jié)果
array(4) {
["id"]=>
string(1) "1"
["event"]=>
string(6) "result"
[""]=>
string(15) "HTTP_STATUS/200"
["data"]=>
string(168) "{"output":{"finish_reason":"null","text":"我是R"},"usage":{"total_tokens":20,"input_tokens":18,"output_tokens":2},"request_id":"87f917d8-f427-92fc-9f44-ed4a559576c3"}"
}
array(4) {
["id"]=>
string(1) "2"
["event"]=>
string(6) "result"
[""]=>
string(15) "HTTP_STATUS/200"
["data"]=>
string(173) "{"output":{"finish_reason":"null","text":"我是Ripple"},"usage":{"total_tokens":21,"input_tokens":18,"output_tokens":3},"request_id":"87f917d8-f427-92fc-9f44-ed4a559576c3"}"
}
...
workerman-ripple 提供了最簡單的
SSE
解決方案,在webman
控制器中只需這么做
use Ripple\Http\Server\Chunk;
use support\Request;
use Workerman\Ripple\Protocols\Http\IteratorResponse;
/**
* @param Request $request
* @return IteratorResponse
*/
public function stream(Request $request): IteratorResponse
{
$iterator = new IteratorResponse(static function () {
foreach (range(1, 10) as $i) {
yield Chunk::event('message', 'hello', $i);
\Co\sleep(0.1);
}
}, $request->connection,true);
return $iterator->withHeaders([
'Content-Type' => 'text/event-stream',
'Cache-Control' => 'no-cache',
'Connection' => 'keep-alive',
]);
}
AsyncTcpConnection
是workerman
中最常用的組件之一,但據(jù)用戶反饋在4.x版本中暫不支持通過代理服務(wù)器進(jìn)行連接,我們提供了一種易用的解決方案如下
use Workerman\Connection\TcpConnection;
use Workerman\Ripple\Utils;
/**
* @var \Workerman\Connection\AsyncTcpConnection $tcpConnection
*/
$tcpConnection = Utils::asyncTcpConnection('ssl://ipconfig.io:443');
$tcpConnection->onConnect = static function (TcpConnection $connection) {
echo 'Connected to ipconfig.io' , \PHP_EOL;
$connection->send("GET /ip HTTP/1.1\r\nHost: ipconfig.io\r\nConnection: close\r\n\r\n");
};
$tcpConnection->onMessage = static function (TcpConnection $connection, string $data) {
echo 'Received data from ipconfig.io: ' . \substr($data, 0, 10),'...' . \PHP_EOL;
};
/**
* 通過代理服務(wù)器連接
* 目前支持http,https,socks5代理,格式參考
* socks5://host:port
* socks5://username:password@host:port
*/
$tcpConnection->connectViaProxy('socks5://127.0.0.1:1080');
更多組件的使用方法請參考官方文檔
注意:webman升級到1.6以后,eventloop的配置改到了config/process.php文件中:
return [
'webman' => [
'handler' => Http::class,
'listen' => 'http://0.0.0.0:8086',
'count' => cpu_count() * 4,
'user' => '',
'group' => '',
'reusePort' => false,
'eventLoop' => \Workerman\Ripple\Driver::class,
//'eventLoop' => '',
'context' => [],
'constructor' => [
'requestClass' => Request::class,
'logger' => Log::channel('default'),
'appPath' => app_path(),
'publicPath' => public_path()
]
],
// File update detection and automatic reload
'monitor' => [
。。。。。
強(qiáng)