這段時(shí)間本身比較忙,也很少在關(guān)注技術(shù)相關(guān)的點(diǎn),上個(gè)月空了剛好有時(shí)間看看群里,結(jié)果發(fā)現(xiàn)大家在討論協(xié)程以及webman/workerman的劣勢-阻塞退化問題,本來說是稍稍提兩下實(shí)現(xiàn)方向,結(jié)果一來二去直接弄了一個(gè)插件出來,經(jīng)過反反復(fù)復(fù)修改,最后發(fā)布了webman-coroutine插件
workerman是標(biāo)準(zhǔn)的master/worker多進(jìn)程模型,master只負(fù)責(zé)管理worker,而每個(gè)worker會(huì)啟動(dòng)event-loop進(jìn)行事件監(jiān)聽,這里面包含了stream、timer等事件,所有事件公用一個(gè)event-loop,公用一套調(diào)度體系;每一個(gè)事件回調(diào)會(huì)觸發(fā)注冊的回調(diào)函數(shù),整體是單線程的執(zhí)行調(diào)度,也就是說如果回調(diào)函數(shù)里面有阻塞,那么會(huì)阻塞event-loop的循環(huán),直到回調(diào)函數(shù)執(zhí)行完畢才會(huì)執(zhí)行下一個(gè)事件回調(diào)。
也就是說你把event-loop看作是一個(gè)隊(duì)列,那么回調(diào)函數(shù)就是消費(fèi)者,這個(gè)隊(duì)列是一個(gè)單消費(fèi)者的隊(duì)列,當(dāng)回調(diào)函數(shù)阻塞的時(shí)候,隊(duì)列是沒有其他消費(fèi)者來消費(fèi)回調(diào)的,這也就造成了隊(duì)頭阻塞問題,當(dāng)隊(duì)列buffer被占滿時(shí),生產(chǎn)者將無法投送事件到event-loop中,這會(huì)造成什么問題呢?假設(shè)我們有N個(gè)worker監(jiān)聽8080端口,當(dāng)有消息的時(shí)候會(huì)觸發(fā)一次start()
方法,而start()
方法是一個(gè)while(1){}
的死循環(huán),那么每請(qǐng)求一次將占用一個(gè)worker,導(dǎo)致worker一直在等待start()
執(zhí)行完畢才能釋放控制權(quán)給event-loop,當(dāng)N個(gè)任務(wù)后,所有worker將被占滿,至此,workerman將無法接收8080端口的任何信息。
當(dāng)然,現(xiàn)實(shí)環(huán)境下沒有這么夸張,但是遇到一些長阻塞的方法時(shí)還是會(huì)存在并發(fā)量上不去的問題,那么在傳統(tǒng)workerman的開發(fā)環(huán)境下怎么處理呢?開多一點(diǎn)worker;其實(shí)你把它看成一個(gè)消息隊(duì)列就好理解,當(dāng)消費(fèi)能力上不去的時(shí)候,要么減少消費(fèi)阻塞時(shí)長,要么就是增加消費(fèi)者。webman也同理,因?yàn)閣ebman是在事件回調(diào)函數(shù)內(nèi)進(jìn)行框架的加載和控制器方法的執(zhí)行的。
有朋友會(huì)說,webman/workerman可以使用swoole作為底層驅(qū)動(dòng),只要安裝swoole并將workerman的驅(qū)動(dòng)設(shè)置為Swoole即可使用協(xié)程了;這種說法并不完全正確。
以下是workerman 4.x的swoole驅(qū)動(dòng)實(shí)現(xiàn):
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author Ares<aresrr#qq.com>
* @link http://www.wtbis.cn/
* @link https://github.com/ares333/Workerman
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace Workerman\Events;
use Workerman\Worker;
use Swoole\Event;
use Swoole\Timer;
class Swoole implements EventInterface
{
protected $_timer = array();
protected $_timerOnceMap = array();
protected $mapId = 0;
protected $_fd = array();
// milisecond
public static $signalDispatchInterval = 500;
protected $_hasSignal = false;
/**
*
* {@inheritdoc}
*
* @see \Workerman\Events\EventInterface::add()
*/
public function add($fd, $flag, $func, $args = array())
{
switch ($flag) {
case self::EV_SIGNAL:
$res = \pcntl_signal($fd, $func, false);
if (! $this->_hasSignal && $res) {
Timer::tick(static::$signalDispatchInterval,
function () {
\pcntl_signal_dispatch();
});
$this->_hasSignal = true;
}
return $res;
case self::EV_TIMER:
case self::EV_TIMER_ONCE:
$method = self::EV_TIMER === $flag ? 'tick' : 'after';
if ($this->mapId > \PHP_INT_MAX) {
$this->mapId = 0;
}
$mapId = $this->mapId++;
$t = (int)($fd * 1000);
if ($t < 1) {
$t = 1;
}
$timer_id = Timer::$method($t,
function ($timer_id = null) use ($func, $args, $mapId) {
try {
\call_user_func_array($func, (array)$args);
} catch (\Exception $e) {
Worker::stopAll(250, $e);
} catch (\Error $e) {
Worker::stopAll(250, $e);
}
// EV_TIMER_ONCE
if (! isset($timer_id)) {
// may be deleted in $func
if (\array_key_exists($mapId, $this->_timerOnceMap)) {
$timer_id = $this->_timerOnceMap[$mapId];
unset($this->_timer[$timer_id],
$this->_timerOnceMap[$mapId]);
}
}
});
if ($flag === self::EV_TIMER_ONCE) {
$this->_timerOnceMap[$mapId] = $timer_id;
$this->_timer[$timer_id] = $mapId;
} else {
$this->_timer[$timer_id] = null;
}
return $timer_id;
case self::EV_READ:
case self::EV_WRITE:
$fd_key = (int) $fd;
if (! isset($this->_fd[$fd_key])) {
if ($flag === self::EV_READ) {
$res = Event::add($fd, $func, null, SWOOLE_EVENT_READ);
$fd_type = SWOOLE_EVENT_READ;
} else {
$res = Event::add($fd, null, $func, SWOOLE_EVENT_WRITE);
$fd_type = SWOOLE_EVENT_WRITE;
}
if ($res) {
$this->_fd[$fd_key] = $fd_type;
}
} else {
$fd_val = $this->_fd[$fd_key];
$res = true;
if ($flag === self::EV_READ) {
if (($fd_val & SWOOLE_EVENT_READ) !== SWOOLE_EVENT_READ) {
$res = Event::set($fd, $func, null,
SWOOLE_EVENT_READ | SWOOLE_EVENT_WRITE);
$this->_fd[$fd_key] |= SWOOLE_EVENT_READ;
}
} else {
if (($fd_val & SWOOLE_EVENT_WRITE) !== SWOOLE_EVENT_WRITE) {
$res = Event::set($fd, null, $func,
SWOOLE_EVENT_READ | SWOOLE_EVENT_WRITE);
$this->_fd[$fd_key] |= SWOOLE_EVENT_WRITE;
}
}
}
return $res;
}
}
/**
*
* {@inheritdoc}
*
* @see \Workerman\Events\EventInterface::del()
*/
public function del($fd, $flag)
{
switch ($flag) {
case self::EV_SIGNAL:
return \pcntl_signal($fd, SIG_IGN, false);
case self::EV_TIMER:
case self::EV_TIMER_ONCE:
// already remove in EV_TIMER_ONCE callback.
if (! \array_key_exists($fd, $this->_timer)) {
return true;
}
$res = Timer::clear($fd);
if ($res) {
$mapId = $this->_timer[$fd];
if (isset($mapId)) {
unset($this->_timerOnceMap[$mapId]);
}
unset($this->_timer[$fd]);
}
return $res;
case self::EV_READ:
case self::EV_WRITE:
$fd_key = (int) $fd;
if (isset($this->_fd[$fd_key])) {
$fd_val = $this->_fd[$fd_key];
if ($flag === self::EV_READ) {
$flag_remove = ~ SWOOLE_EVENT_READ;
} else {
$flag_remove = ~ SWOOLE_EVENT_WRITE;
}
$fd_val &= $flag_remove;
if (0 === $fd_val) {
$res = Event::del($fd);
if ($res) {
unset($this->_fd[$fd_key]);
}
} else {
$res = Event::set($fd, null, null, $fd_val);
if ($res) {
$this->_fd[$fd_key] = $fd_val;
}
}
} else {
$res = true;
}
return $res;
}
}
/**
*
* {@inheritdoc}
*
* @see \Workerman\Events\EventInterface::clearAllTimer()
*/
public function clearAllTimer()
{
foreach (array_keys($this->_timer) as $v) {
Timer::clear($v);
}
$this->_timer = array();
$this->_timerOnceMap = array();
}
/**
*
* {@inheritdoc}
*
* @see \Workerman\Events\EventInterface::loop()
*/
public function loop()
{
Event::wait();
}
/**
*
* {@inheritdoc}
*
* @see \Workerman\Events\EventInterface::destroy()
*/
public function destroy()
{
Event::exit();
posix_kill(posix_getpid(), SIGINT);
}
/**
*
* {@inheritdoc}
*
* @see \Workerman\Events\EventInterface::getTimerCount()
*/
public function getTimerCount()
{
return \count($this->_timer);
}
}
我們可以看到確實(shí)正確加載了Swoole的event-loop驅(qū)動(dòng),但僅僅也只是加載了event-loop,并沒有在回調(diào)的注冊部分加入?yún)f(xié)程,那么就相當(dāng)于僅僅只是寫了一個(gè)\Co\run()
,但是沒有在\Co\run()
中創(chuàng)建協(xié)程進(jìn)行運(yùn)行,那么意味著當(dāng)事件的回調(diào)函數(shù)中當(dāng)監(jiān)聽8080端口進(jìn)行處理,遇到了阻塞的時(shí)候還是無法出讓當(dāng)前控制權(quán)給event-loop,event-loop就沒辦法執(zhí)行下一個(gè)8080端口的事件,為什么會(huì)這樣呢?因?yàn)閣orkerman使用stream_socket_server()
對(duì)外部網(wǎng)絡(luò)進(jìn)行監(jiān)聽,而如下代碼又會(huì)等待回調(diào):
// Workerman\Worker 2465-2476行
public function resumeAccept()
{
// Register a listener to be notified when server socket is ready to read.
if (static::$globalEvent && true === $this->_pauseAccept && $this->_mainSocket) {
if ($this->transport !== 'udp') {
static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
} else {
static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptUdpConnection'));
}
$this->_pauseAccept = false;
}
}
那么即便swoole底層hook了系統(tǒng)函數(shù),也只是將mainSocket的回調(diào)出讓,但來自相同mainSocket的下一次事件是需要上一次事件完結(jié)恢復(fù)才可以繼續(xù)接收的。
以上的問題會(huì)導(dǎo)致什么樣的問題呢?
傳統(tǒng)解決方案:多開worker
傳統(tǒng)解決方案:自定義進(jìn)程實(shí)現(xiàn) 或 使用外部服務(wù)
基于上述情況,我開發(fā)了webman/workerman可用的協(xié)程基建插件,webman-coroutine;
插件通過適配器模式和工廠模式的方法去兼容現(xiàn)目前市面上比較常見的幾種協(xié)程驅(qū)動(dòng)swow、swoole、php-fiber(ripple實(shí)現(xiàn)),將不同的底層驅(qū)動(dòng)抽象適配為統(tǒng)一的調(diào)用方法,并且兼容非協(xié)程環(huán)境,也就意味著你用同一套代碼寫出來的業(yè)務(wù)可以較為平滑的切換在這些環(huán)境及非協(xié)程環(huán)境之間,且保證邏輯是正常運(yùn)行。
插件為webman的開發(fā)框架重新實(shí)現(xiàn)了webserver,讓原本不完備支持協(xié)程的框架可以完備的支持協(xié)程:
<?php
/**
* @author workbunny/Chaz6chez
* @email chaz6chez1993@outlook.com
*/
declare(strict_types=1);
namespace Workbunny\WebmanCoroutine;
use Webman\App;
use Webman\Http\Request;
use Workbunny\WebmanCoroutine\Handlers\HandlerInterface;
use Workbunny\WebmanCoroutine\Utils\Coroutine\Coroutine;
use Workbunny\WebmanCoroutine\Utils\WaitGroup\WaitGroup;
use Workerman\Connection\ConnectionInterface;
use Workerman\Worker;
/**
* 協(xié)程化web服務(wù)進(jìn)程
*/
class CoroutineWebServer extends App
{
/**
* 每個(gè)連接的協(xié)程計(jì)數(shù)
*
* @var int[]
*/
protected static array $_connectionCoroutineCount = [];
/**
* 獲取連接的協(xié)程計(jì)數(shù)
*
* @return int[]|int
*/
public static function getConnectionCoroutineCount(?string $connectionId = null): array|int
{
return $connectionId === null
? static::$_connectionCoroutineCount
: (static::$_connectionCoroutineCount[$connectionId] ?? 0);
}
/**
* 回收連接的協(xié)程計(jì)數(shù)
*
* @param string $connectionId
* @param bool $force
* @return void
*/
public static function unsetConnectionCoroutineCount(string $connectionId, bool $force = false): void
{
if (!$force and self::getConnectionCoroutineCount($connectionId) > 0) {
return;
}
unset(static::$_connectionCoroutineCount[$connectionId]);
}
/** @inheritdoc */
public function onWorkerStart($worker)
{
if (!\config('plugin.workbunny.webman-coroutine.app.enable', false)) {
return;
}
parent::onWorkerStart($worker);
/** @var HandlerInterface $handler */
$handler = Factory::getCurrentHandler();
$handler::initEnv();
}
/**
* 停止服務(wù)
*
* - 不用返回值和參數(shù)標(biāo)定是為了兼容
*
* @param Worker|mixed $worker
* @return void
*/
public function onWorkerStop($worker, ...$params)
{
if (is_callable($call = [parent::class, 'onWorkerStop'])) {
call_user_func($call, $worker, ...$params);
}
}
/**
* 連接建立
*
* - 不用返回值和參數(shù)標(biāo)定是為了兼容
*
* @param ConnectionInterface $connection
* @param mixed ...$params
* @return void
*/
public function onConnect($connection, ...$params): void
{
if (!is_object($connection)) {
return;
}
if (is_callable($call = [parent::class, 'onConnect'])) {
// 協(xié)程化創(chuàng)建連接
new Coroutine(function () use ($call, $connection, $params) {
call_user_func($call, $connection, ...$params);
});
}
}
/**
* 連接關(guān)閉
*
* - 不用返回值和參數(shù)標(biāo)定是為了兼容
*
* @param ConnectionInterface|mixed $connection
* @param ...$params
* @return void
*/
public function onClose($connection, ...$params)
{
if (!is_object($connection)) {
return;
}
if (is_callable($call = [parent::class, 'onClose'])) {
// 協(xié)程化關(guān)閉連接
new Coroutine(function () use ($call, $connection, $params) {
call_user_func($call, $connection, ...$params);
});
}
self::unsetConnectionCoroutineCount(spl_object_hash($connection), true);
}
/**
* @link parent::onMessage()
* @param ConnectionInterface|mixed $connection
* @param Request|mixed $request
* @param ...$params
* @return null
* @link parent::onMessage()
*/
public function onMessage($connection, $request, ...$params)
{
if (!is_object($connection)) {
return null;
}
$connectionId = spl_object_hash($connection);
$params = func_get_args();
$res = null;
// 檢測協(xié)程數(shù)
if (($consumerCount = \config('plugin.workbunny.webman-coroutine.app.consumer_count', 0)) > 0) {
// 等待協(xié)程回收
wait_for(function () use ($connectionId, $consumerCount) {
return self::getConnectionCoroutineCount($connectionId) <= $consumerCount;
});
}
$waitGroup = new WaitGroup();
$waitGroup->add();
// 請(qǐng)求消費(fèi)協(xié)程
new Coroutine(function () use (&$res, $waitGroup, $params, $connectionId) {
$res = parent::onMessage(...$params);
// 計(jì)數(shù) --
self::$_connectionCoroutineCount[$connectionId] --;
// 嘗試回收
self::unsetConnectionCoroutineCount($connectionId);
// wg完成
$waitGroup->done();
});
// 計(jì)數(shù) ++
self::$_connectionCoroutineCount[$connectionId] =
(isset(self::$_connectionCoroutineCount[$connectionId])
? self::$_connectionCoroutineCount[$connectionId] + 1
: 1);
// 等待
$waitGroup->wait();
return $res;
}
}
CoroutineWebServer是繼承并代理了App的onMessage方法,將原本的方法執(zhí)行回調(diào)化,并且做到了非侵入onMessage的執(zhí)行邏輯,較為安全的支持了未來webman可能的升級(jí)改動(dòng)。
另外對(duì)于workerman 4.x下的event驅(qū)動(dòng)也做了兼容,除了增加了swow的事件驅(qū)動(dòng)外,還重新實(shí)現(xiàn)了swoole的事件驅(qū)動(dòng):
<?php
/**
* @author workbunny/Chaz6chez
* @email chaz6chez1993@outlook.com
*/
declare(strict_types=1);
namespace Workbunny\WebmanCoroutine\Events;
use Swoole\Coroutine;
use Swoole\Event;
use Swoole\Process;
use Swoole\Timer;
use Workbunny\WebmanCoroutine\Exceptions\EventLoopException;
use Workerman\Events\EventInterface;
class SwooleEvent implements EventInterface
{
/** @var int[] All listeners for read event. */
protected array $_reads = [];
/** @var int[] All listeners for write event. */
protected array $_writes = [];
/** @var callable[] Event listeners of signal. */
protected array $_signals = [];
/** @var int[] Timer id to timer info. */
protected array $_timer = [];
/** @var int 定時(shí)器id */
protected int $_timerId = 0;
/**
* @param bool $debug 測試用
* @throws EventLoopException 如果沒有啟用拓展
*/
public function __construct(bool $debug = false)
{
if (!$debug and !extension_loaded('swoole')) {
throw new EventLoopException('Not support ext-swoole. ');
}
}
/** @inheritdoc */
public function add($fd, $flag, $func, $args = [])
{
switch ($flag) {
case EventInterface::EV_SIGNAL:
if (!isset($this->_signals[$fd])) {
if ($res = Process::signal($fd, $func)) {
$this->_signals[$fd] = $func;
}
return $res;
}
return false;
case EventInterface::EV_TIMER:
case EventInterface::EV_TIMER_ONCE:
$timerId = $this->_timerId++;
$this->_timer[$timerId] = Timer::after((int) ($fd * 1000), function () use ($timerId, $flag, $func) {
call_user_func($func);
if ($flag === EventInterface::EV_TIMER_ONCE) {
$this->del($timerId, $flag);
}
});
return $timerId;
case EventInterface::EV_READ:
if (\is_resource($fd)) {
if ($this->_reads[$key = (int) $fd] ?? null) {
$this->del($fd, EventInterface::EV_READ);
}
if ($res = Event::add($fd, $func, null, SWOOLE_EVENT_READ)) {
$this->_reads[$key] = 1;
}
return (bool) $res;
}
return false;
case self::EV_WRITE:
if (\is_resource($fd)) {
if ($this->_writes[$key = (int) $fd] ?? null) {
$this->del($fd, EventInterface::EV_WRITE);
}
if ($res = Event::add($fd, $func, null, SWOOLE_EVENT_WRITE)) {
$this->_writes[$key] = 1;
}
return (bool) $res;
}
return false;
default:
return null;
}
}
/** @inheritdoc */
public function del($fd, $flag)
{
switch ($flag) {
case self::EV_SIGNAL:
if ($this->_signals[$fd] ?? null) {
if (Process::signal($fd, null)) {
unset($this->_signals[$fd]);
return true;
}
}
return false;
case self::EV_TIMER:
case self::EV_TIMER_ONCE:
if ($id = $this->_timer[$fd] ?? null) {
if (Timer::clear($id)) {
unset($this->_timer[$fd]);
return true;
}
}
return false;
case self::EV_READ:
if (
\is_resource($fd) and
isset($this->_reads[$key = (int) $fd]) and
Event::isset($fd, SWOOLE_EVENT_READ)
) {
if (Event::del($fd)) {
unset($this->_reads[$key]);
return true;
}
}
return false;
case self::EV_WRITE:
if (
\is_resource($fd) and
isset($this->_writes[$key = (int) $fd]) and
Event::isset($fd, SWOOLE_EVENT_WRITE)
) {
if (Event::del($fd)) {
unset($this->_writes[$key]);
return true;
}
}
return false;
default:
return null;
}
}
/** @inheritdoc */
public function loop()
{
// 阻塞等待
Event::wait();
// 確定loop為退出狀態(tài)
exit(0);
}
/** @inheritdoc */
public function destroy()
{
// 移除所有定時(shí)器
$this->clearAllTimer();
// 退出所有協(xié)程
foreach (Coroutine::listCoroutines() as $coroutine) {
Coroutine::cancel($coroutine);
}
// 退出event loop
Event::exit();
$this->_reads = $this->_writes = [];
}
/** @inheritdoc */
public function clearAllTimer()
{
foreach ($this->_timer as $id) {
Timer::clear($id);
}
$this->_timer = [];
}
/** @inheritdoc */
public function getTimerCount()
{
return count($this->_timer);
}
}
在測試workerman 5.x的過程中還找到了一些workerman的swoole驅(qū)動(dòng)的bug,我進(jìn)行了PR,積極參與維護(hù),fix: all coroutines must be canceled before Event::exit #1059
其他更多特性及功能請(qǐng)參考插件文檔,插件也支持純workerman開發(fā)環(huán)境,webman-coroutine文檔
$a = new \stdClass();
$a->id = 1;
new Coroutine(function () use ($a) {
// 一些業(yè)務(wù)邏輯
$a->id = 2;
})
new Coroutine(function () use ($a) {
// 一些業(yè)務(wù)邏輯
$a->id = 3;
})
// 等待所有協(xié)程結(jié)束
// 由于每個(gè)協(xié)程的邏輯中可能存在協(xié)程切換出讓,結(jié)合對(duì)象是堆數(shù)據(jù)且引用,最后的結(jié)果不能保證是1或者2或者3
// 數(shù)組同理
echo $a->id;
$a = 1;
new Coroutine(function () use (&$a) {
// 一些業(yè)務(wù)邏輯
$a = 2;
})
new Coroutine(function () use (&$a) {
// 一些業(yè)務(wù)邏輯
$a = 3;
})
// 等待所有協(xié)程結(jié)束
// 由于每個(gè)協(xié)程的邏輯中可能存在協(xié)程切換出讓,變量是引用,最后的結(jié)果不能保證是1或者2或者3
echo $a;
static array $context = [];
$a = 1;
$id1 = new Coroutine(function () use (&$id1) {
$contextA = self::$context[$id]
// 一些業(yè)務(wù)邏輯
self::$context[$id1] = 2;
})
self::$context[$id1] = $a;
$id2 = new Coroutine(function () use (&$id2) {
$contextB = self::$context[$id]
// 一些業(yè)務(wù)邏輯
self::$context[$id2] = 3;
})
self::$context[$id1] = $a;
// 等待所有協(xié)程結(jié)束
// 這里會(huì)輸出1
echo $a;
// 讀取上下文內(nèi)容,獲取協(xié)程結(jié)果, 一般這里不推薦直接上下文讀取,而是通過CSP模型的channel進(jìn)行傳遞
// 還要注意上下文的回收,避免靜態(tài)數(shù)組膨脹
echo self::$context[$id1];
echo self::$context[$id2];
// 以上并不是完整的上下文實(shí)現(xiàn)方案,只是一個(gè)偽代碼??!
PDO在發(fā)送SQL后會(huì)阻塞等待SQL的執(zhí)行結(jié)果,swow和swoole在底層hook了阻塞等待的過程,進(jìn)行了協(xié)程切換
以pdo的mysql舉例:
// https://github.com/php/php-src/blob/master/ext/pdo_mysql/mysql_driver.c
static zend_long mysql_handle_doer(pdo_dbh_t *dbh, const zend_string *sql)
{
pdo_mysql_db_handle *H = (pdo_mysql_db_handle *)dbh->driver_data;
PDO_DBG_ENTER("mysql_handle_doer");
PDO_DBG_INF_FMT("dbh=%p", dbh);
PDO_DBG_INF_FMT("sql=%.*s", (int)ZSTR_LEN(sql), ZSTR_VAL(sql));
if (mysql_real_query(H->server, ZSTR_VAL(sql), ZSTR_LEN(sql))) {
pdo_mysql_error(dbh);
PDO_DBG_RETURN(-1);
} else {
my_ulonglong c = mysql_affected_rows(H->server);
if (c == (my_ulonglong) -1) {
pdo_mysql_error(dbh);
PDO_DBG_RETURN(H->einfo.errcode ? -1 : 0);
} else {
/* MULTI_QUERY support - eat up all unfetched result sets */
MYSQL_RES* result;
while (mysql_more_results(H->server)) {
if (mysql_next_result(H->server)) {
pdo_mysql_error(dbh);
PDO_DBG_RETURN(-1);
}
result = mysql_store_result(H->server);
if (result) {
mysql_free_result(result);
}
}
PDO_DBG_RETURN((int)c);
}
}
}
以上代碼可以簡單理解為以下偽代碼
$requestId = $mysqlClient->send('SQL');
while (1) {
$res = $mysqlClient->get($requestId);
if ($res) {
return $res;
}
// 超時(shí)等其他機(jī)制
// 協(xié)程sleep出讓
}
$mysqlClient->send('SQL');
由DB服務(wù)器接收并依次執(zhí)行(來源于同一個(gè)連接的多次SQL是順序執(zhí)行),但可能存在后者的協(xié)程結(jié)果喚起了前者協(xié)程的$res = $mysqlClient->get($requestId);
,從而導(dǎo)致數(shù)據(jù)錯(cuò)亂;這里本質(zhì)上是因?yàn)镻DO對(duì)象是堆數(shù)據(jù),在多個(gè)協(xié)程中是競態(tài)的,為了避免這樣的情況,有以下方案解決:
目前webman/workerman的協(xié)程實(shí)現(xiàn)僅僅只是入了個(gè)門,主要解決了阻塞退化問題,能夠簡單的實(shí)現(xiàn)以下場景:
但還有很多基建需要社區(qū)出謀出力添磚加瓦,比如:
當(dāng)然,在此之前,你可以使用所有基于swow\swoole\ripple\revolt協(xié)程驅(qū)動(dòng)開發(fā)的協(xié)程版組件,但我希望未來可以整合這些協(xié)程實(shí)現(xiàn)的組件,能夠有一個(gè)統(tǒng)一的使用方式(雖然難度相當(dāng)大,但也想試試);
歡迎大佬們共建,issue和PR!
文章如有錯(cuò)誤,敬請(qǐng)指正。謝謝?。?/strong>
目前已經(jīng)實(shí)現(xiàn)了較為基礎(chǔ)的Utils\Pool
工具,可用于對(duì)象池化的實(shí)現(xiàn)
Utils\Poo\Debugger
可用于檢測待池化的對(duì)象是否存在非法和風(fēng)險(xiǎn),拋出的異常可以自行捕獲進(jìn)行日志監(jiān)控或者是調(diào)試,詳細(xì)參考測試用例
Utils\Poo\Pool
可用于實(shí)現(xiàn)連接池、資源鎖等,具體可參考文檔建議和意見都可以提交issue
歡迎各位大佬的PR!
好文~
????????????
????????????
????????????
????????????
????????????
????????????
大佬,請(qǐng)問這個(gè)插件有生產(chǎn)環(huán)境使用嗎?我目前正在為公司項(xiàng)目尋找框架,選定webman+gatewaywork,但是苦于沒有協(xié)程,看到你這個(gè)插件,想用,又擔(dān)心有坑
大佬,讀了下文檔,有以下幾個(gè)問題不懂
1、webman 的webserver 要同時(shí)開啟 CoroutineWebServer和webman自帶的server嗎?那樣豈不是對(duì)外暴露了2個(gè)端口,webman自帶的可以去掉嗎?
2、操作數(shù)據(jù)庫可以用hypderf/database 解決連接池問題嗎?
3、上下文,用webman自帶的context類保存包括對(duì)象,數(shù)組這些數(shù)據(jù)類型可以嗎?還是必須要用協(xié)程id作為key區(qū)別開來
1.可以關(guān)閉webman自帶的server
2.使用swoole驅(qū)動(dòng)可以使用hyperf的db,但需要自己引入框架
3.一般使用無需關(guān)注上下文,跨協(xié)程處理數(shù)據(jù)建議使用channel
我想用到的上下文其實(shí)就是想保存當(dāng)前這個(gè)請(qǐng)求的全局變量,類似fpm下的靜態(tài)屬性,處理完這個(gè)請(qǐng)求就銷毀的
3.我理解的實(shí)際上swoole的協(xié)程都在同一個(gè)進(jìn)程空間,可以共同使用進(jìn)程內(nèi)存資源,所以多個(gè)協(xié)程利用同一個(gè)數(shù)組保存數(shù)據(jù)是ok的,而webman自帶的Context類就是根據(jù)協(xié)程id作為數(shù)組key,區(qū)分不同的協(xié)程資源的,適用的場景就是不需要協(xié)程間協(xié)作的場景;
而使用swoole的channel適用于需要協(xié)程間協(xié)作的場景,比如,一次請(qǐng)求下要記錄到數(shù)據(jù)庫、記錄日志,并將記錄的結(jié)果響應(yīng)給調(diào)用方,這種情況下解決方案就是:
1.父協(xié)程創(chuàng)建協(xié)程A和協(xié)程B分別處理這兩件事
2.父協(xié)程阻塞當(dāng)前協(xié)程,等待協(xié)程A和協(xié)程B的執(zhí)行結(jié)果
3.父協(xié)程得到了結(jié)果,并響應(yīng)給調(diào)用放。
上述的情況使用共享變量的情況下就很難處理,想要阻塞父協(xié)程,不使用內(nèi)置的waitGroup的情況下,就必須在父協(xié)程寫個(gè)循環(huán)體,并且為了不阻塞進(jìn)程,還需要寫個(gè)IO,即sleep ,while(true){ sleep(1)}
而使用channel就很方便,直接在父協(xié)程pop兩次,協(xié)程A完成后push(),協(xié)程B完成后push(),偽代碼:
$chan = new Channel();
go(function(){
//
$chan->push()
})
go(function(){
//
$chan->push()
})
$chan->pop()
$chan->pop()
return response("ok");
workerman如果使用協(xié)程,協(xié)程也在一個(gè)進(jìn)程空間,數(shù)據(jù)引用在協(xié)程之間是不隔離的,比如&的數(shù)據(jù),比如對(duì)象,比如資源類型,比如數(shù)組,只要是引用類型的數(shù)據(jù)都存在協(xié)程間的競爭狀態(tài),這樣的競爭狀態(tài)會(huì)導(dǎo)致數(shù)據(jù)可能被污染,為了達(dá)到數(shù)據(jù)不被污染的效果,除了對(duì)數(shù)據(jù)加鎖外,還可以通過channel進(jìn)行有序傳遞,也就是當(dāng)一個(gè)數(shù)據(jù)正在被一個(gè)協(xié)程消費(fèi)的時(shí)候,其他的協(xié)程是沒有從通道內(nèi)獲取到數(shù)據(jù)的,直到獲取到數(shù)據(jù)的協(xié)程消費(fèi)完畢,變相的其實(shí)也是一種鎖的機(jī)制,在pop不到數(shù)據(jù)的時(shí)候協(xié)程會(huì)自動(dòng)出讓控制權(quán);不論是上下文還是通道還是sync鎖,都是一種競爭數(shù)據(jù)的并發(fā)安全操作。
大佬厲害