国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

webman/workerman的協(xié)程基建套件及分享

chaz6chez

前言

這段時(shí)間本身比較忙,也很少在關(guān)注技術(shù)相關(guān)的點(diǎn),上個(gè)月空了剛好有時(shí)間看看群里,結(jié)果發(fā)現(xiàn)大家在討論協(xié)程以及webman/workerman的劣勢-阻塞退化問題,本來說是稍稍提兩下實(shí)現(xiàn)方向,結(jié)果一來二去直接弄了一個(gè)插件出來,經(jīng)過反反復(fù)復(fù)修改,最后發(fā)布了webman-coroutine插件

現(xiàn)狀

workerman/webman的阻塞退化問題

workerman是標(biāo)準(zhǔn)的master/worker多進(jìn)程模型,master只負(fù)責(zé)管理worker,而每個(gè)worker會(huì)啟動(dòng)event-loop進(jìn)行事件監(jiān)聽,這里面包含了stream、timer等事件,所有事件公用一個(gè)event-loop,公用一套調(diào)度體系;每一個(gè)事件回調(diào)會(huì)觸發(fā)注冊的回調(diào)函數(shù),整體是單線程的執(zhí)行調(diào)度,也就是說如果回調(diào)函數(shù)里面有阻塞,那么會(huì)阻塞event-loop的循環(huán),直到回調(diào)函數(shù)執(zhí)行完畢才會(huì)執(zhí)行下一個(gè)事件回調(diào)。

也就是說你把event-loop看作是一個(gè)隊(duì)列,那么回調(diào)函數(shù)就是消費(fèi)者,這個(gè)隊(duì)列是一個(gè)單消費(fèi)者的隊(duì)列,當(dāng)回調(diào)函數(shù)阻塞的時(shí)候,隊(duì)列是沒有其他消費(fèi)者來消費(fèi)回調(diào)的,這也就造成了隊(duì)頭阻塞問題,當(dāng)隊(duì)列buffer被占滿時(shí),生產(chǎn)者將無法投送事件到event-loop中,這會(huì)造成什么問題呢?假設(shè)我們有N個(gè)worker監(jiān)聽8080端口,當(dāng)有消息的時(shí)候會(huì)觸發(fā)一次start()方法,而start()方法是一個(gè)while(1){}的死循環(huán),那么每請(qǐng)求一次將占用一個(gè)worker,導(dǎo)致worker一直在等待start()執(zhí)行完畢才能釋放控制權(quán)給event-loop,當(dāng)N個(gè)任務(wù)后,所有worker將被占滿,至此,workerman將無法接收8080端口的任何信息。

當(dāng)然,現(xiàn)實(shí)環(huán)境下沒有這么夸張,但是遇到一些長阻塞的方法時(shí)還是會(huì)存在并發(fā)量上不去的問題,那么在傳統(tǒng)workerman的開發(fā)環(huán)境下怎么處理呢?開多一點(diǎn)worker;其實(shí)你把它看成一個(gè)消息隊(duì)列就好理解,當(dāng)消費(fèi)能力上不去的時(shí)候,要么減少消費(fèi)阻塞時(shí)長,要么就是增加消費(fèi)者。webman也同理,因?yàn)閣ebman是在事件回調(diào)函數(shù)內(nèi)進(jìn)行框架的加載和控制器方法的執(zhí)行的。

workerman swoole驅(qū)動(dòng)未使用協(xié)程

有朋友會(huì)說,webman/workerman可以使用swoole作為底層驅(qū)動(dòng),只要安裝swoole并將workerman的驅(qū)動(dòng)設(shè)置為Swoole即可使用協(xié)程了;這種說法并不完全正確。

以下是workerman 4.x的swoole驅(qū)動(dòng)實(shí)現(xiàn):


<?php
/**
 * This file is part of workerman.
 *
 * Licensed under The MIT License
 * For full copyright and license information, please see the MIT-LICENSE.txt
 * Redistributions of files must retain the above copyright notice.
 *
 * @author    Ares<aresrr#qq.com>
 * @link      http://www.wtbis.cn/
 * @link      https://github.com/ares333/Workerman
 * @license   http://www.opensource.org/licenses/mit-license.php MIT License
 */
namespace Workerman\Events;

use Workerman\Worker;
use Swoole\Event;
use Swoole\Timer;

class Swoole implements EventInterface
{

    protected $_timer = array();

    protected $_timerOnceMap = array();

    protected $mapId = 0;

    protected $_fd = array();

    // milisecond
    public static $signalDispatchInterval = 500;

    protected $_hasSignal = false;

    /**
     *
     * {@inheritdoc}
     *
     * @see \Workerman\Events\EventInterface::add()
     */
    public function add($fd, $flag, $func, $args = array())
    {
        switch ($flag) {
            case self::EV_SIGNAL:
                $res = \pcntl_signal($fd, $func, false);
                if (! $this->_hasSignal && $res) {
                    Timer::tick(static::$signalDispatchInterval,
                        function () {
                            \pcntl_signal_dispatch();
                        });
                    $this->_hasSignal = true;
                }
                return $res;
            case self::EV_TIMER:
            case self::EV_TIMER_ONCE:
                $method = self::EV_TIMER === $flag ? 'tick' : 'after';
                if ($this->mapId > \PHP_INT_MAX) {
                    $this->mapId = 0;
                }
                $mapId = $this->mapId++;
                $t = (int)($fd * 1000);
                if ($t < 1) {
                   $t = 1;   
                }
                $timer_id = Timer::$method($t,
                    function ($timer_id = null) use ($func, $args, $mapId) {
                        try {
                            \call_user_func_array($func, (array)$args);
                        } catch (\Exception $e) {
                            Worker::stopAll(250, $e);
                        } catch (\Error $e) {
                            Worker::stopAll(250, $e);
                        }
                        // EV_TIMER_ONCE
                        if (! isset($timer_id)) {
                            // may be deleted in $func
                            if (\array_key_exists($mapId, $this->_timerOnceMap)) {
                                $timer_id = $this->_timerOnceMap[$mapId];
                                unset($this->_timer[$timer_id],
                                    $this->_timerOnceMap[$mapId]);
                            }
                        }
                    });
                if ($flag === self::EV_TIMER_ONCE) {
                    $this->_timerOnceMap[$mapId] = $timer_id;
                    $this->_timer[$timer_id] = $mapId;
                } else {
                    $this->_timer[$timer_id] = null;
                }
                return $timer_id;
            case self::EV_READ:
            case self::EV_WRITE:
                $fd_key = (int) $fd;
                if (! isset($this->_fd[$fd_key])) {
                    if ($flag === self::EV_READ) {
                        $res = Event::add($fd, $func, null, SWOOLE_EVENT_READ);
                        $fd_type = SWOOLE_EVENT_READ;
                    } else {
                        $res = Event::add($fd, null, $func, SWOOLE_EVENT_WRITE);
                        $fd_type = SWOOLE_EVENT_WRITE;
                    }
                    if ($res) {
                        $this->_fd[$fd_key] = $fd_type;
                    }
                } else {
                    $fd_val = $this->_fd[$fd_key];
                    $res = true;
                    if ($flag === self::EV_READ) {
                        if (($fd_val & SWOOLE_EVENT_READ) !== SWOOLE_EVENT_READ) {
                            $res = Event::set($fd, $func, null,
                                SWOOLE_EVENT_READ | SWOOLE_EVENT_WRITE);
                            $this->_fd[$fd_key] |= SWOOLE_EVENT_READ;
                        }
                    } else {
                        if (($fd_val & SWOOLE_EVENT_WRITE) !== SWOOLE_EVENT_WRITE) {
                            $res = Event::set($fd, null, $func,
                                SWOOLE_EVENT_READ | SWOOLE_EVENT_WRITE);
                            $this->_fd[$fd_key] |= SWOOLE_EVENT_WRITE;
                        }
                    }
                }
                return $res;
        }
    }

    /**
     *
     * {@inheritdoc}
     *
     * @see \Workerman\Events\EventInterface::del()
     */
    public function del($fd, $flag)
    {
        switch ($flag) {
            case self::EV_SIGNAL:
                return \pcntl_signal($fd, SIG_IGN, false);
            case self::EV_TIMER:
            case self::EV_TIMER_ONCE:
                // already remove in EV_TIMER_ONCE callback.
                if (! \array_key_exists($fd, $this->_timer)) {
                    return true;
                }
                $res = Timer::clear($fd);
                if ($res) {
                    $mapId = $this->_timer[$fd];
                    if (isset($mapId)) {
                        unset($this->_timerOnceMap[$mapId]);
                    }
                    unset($this->_timer[$fd]);
                }
                return $res;
            case self::EV_READ:
            case self::EV_WRITE:
                $fd_key = (int) $fd;
                if (isset($this->_fd[$fd_key])) {
                    $fd_val = $this->_fd[$fd_key];
                    if ($flag === self::EV_READ) {
                        $flag_remove = ~ SWOOLE_EVENT_READ;
                    } else {
                        $flag_remove = ~ SWOOLE_EVENT_WRITE;
                    }
                    $fd_val &= $flag_remove;
                    if (0 === $fd_val) {
                        $res = Event::del($fd);
                        if ($res) {
                            unset($this->_fd[$fd_key]);
                        }
                    } else {
                        $res = Event::set($fd, null, null, $fd_val);
                        if ($res) {
                            $this->_fd[$fd_key] = $fd_val;
                        }
                    }
                } else {
                    $res = true;
                }
                return $res;
        }
    }

    /**
     *
     * {@inheritdoc}
     *
     * @see \Workerman\Events\EventInterface::clearAllTimer()
     */
    public function clearAllTimer()
    {
        foreach (array_keys($this->_timer) as $v) {
            Timer::clear($v);
        }
        $this->_timer = array();
        $this->_timerOnceMap = array();
    }

    /**
     *
     * {@inheritdoc}
     *
     * @see \Workerman\Events\EventInterface::loop()
     */
    public function loop()
    {
        Event::wait();
    }

    /**
     *
     * {@inheritdoc}
     *
     * @see \Workerman\Events\EventInterface::destroy()
     */
    public function destroy()
    {
        Event::exit();
        posix_kill(posix_getpid(), SIGINT);
    }

    /**
     *
     * {@inheritdoc}
     *
     * @see \Workerman\Events\EventInterface::getTimerCount()
     */
    public function getTimerCount()
    {
        return \count($this->_timer);
    }
}

我們可以看到確實(shí)正確加載了Swoole的event-loop驅(qū)動(dòng),但僅僅也只是加載了event-loop,并沒有在回調(diào)的注冊部分加入?yún)f(xié)程,那么就相當(dāng)于僅僅只是寫了一個(gè)\Co\run(),但是沒有在\Co\run()中創(chuàng)建協(xié)程進(jìn)行運(yùn)行,那么意味著當(dāng)事件的回調(diào)函數(shù)中當(dāng)監(jiān)聽8080端口進(jìn)行處理,遇到了阻塞的時(shí)候還是無法出讓當(dāng)前控制權(quán)給event-loop,event-loop就沒辦法執(zhí)行下一個(gè)8080端口的事件,為什么會(huì)這樣呢?因?yàn)閣orkerman使用stream_socket_server()對(duì)外部網(wǎng)絡(luò)進(jìn)行監(jiān)聽,而如下代碼又會(huì)等待回調(diào):

    // Workerman\Worker 2465-2476行
    public function resumeAccept()
    {
        // Register a listener to be notified when server socket is ready to read.
        if (static::$globalEvent && true === $this->_pauseAccept && $this->_mainSocket) {
            if ($this->transport !== 'udp') {
                static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
            } else {
                static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptUdpConnection'));
            }
            $this->_pauseAccept = false;
        }
    }

那么即便swoole底層hook了系統(tǒng)函數(shù),也只是將mainSocket的回調(diào)出讓,但來自相同mainSocket的下一次事件是需要上一次事件完結(jié)恢復(fù)才可以繼續(xù)接收的。

導(dǎo)致

以上的問題會(huì)導(dǎo)致什么樣的問題呢?

  • 非刻意的阻塞將worker占滿,極端情況降低吞吐承載力
    • PDO
    • curl
    • 文件讀寫
    • 等等 blocking-I/O相關(guān)

傳統(tǒng)解決方案:多開worker

  • 因?yàn)閮?nèi)外共用event-loop,刻意的阻塞實(shí)現(xiàn)會(huì)將worker占滿,導(dǎo)致無法接收處理外部網(wǎng)絡(luò)請(qǐng)求
    • 長輪詢接口
    • http-sse
    • 一些長連接場景
    • 帶有阻塞業(yè)務(wù)的timer
    • 隊(duì)列 生產(chǎn)/消費(fèi)
    • 等等

傳統(tǒng)解決方案:自定義進(jìn)程實(shí)現(xiàn) 或 使用外部服務(wù)

解決

基于上述情況,我開發(fā)了webman/workerman可用的協(xié)程基建插件,webman-coroutine;

插件通過適配器模式和工廠模式的方法去兼容現(xiàn)目前市面上比較常見的幾種協(xié)程驅(qū)動(dòng)swow、swoole、php-fiber(ripple實(shí)現(xiàn)),將不同的底層驅(qū)動(dòng)抽象適配為統(tǒng)一的調(diào)用方法,并且兼容非協(xié)程環(huán)境,也就意味著你用同一套代碼寫出來的業(yè)務(wù)可以較為平滑的切換在這些環(huán)境及非協(xié)程環(huán)境之間,且保證邏輯是正常運(yùn)行。

插件為webman的開發(fā)框架重新實(shí)現(xiàn)了webserver,讓原本不完備支持協(xié)程的框架可以完備的支持協(xié)程:

<?php
/**
 * @author workbunny/Chaz6chez
 * @email chaz6chez1993@outlook.com
 */
declare(strict_types=1);

namespace Workbunny\WebmanCoroutine;

use Webman\App;
use Webman\Http\Request;
use Workbunny\WebmanCoroutine\Handlers\HandlerInterface;
use Workbunny\WebmanCoroutine\Utils\Coroutine\Coroutine;
use Workbunny\WebmanCoroutine\Utils\WaitGroup\WaitGroup;
use Workerman\Connection\ConnectionInterface;
use Workerman\Worker;

/**
 *  協(xié)程化web服務(wù)進(jìn)程
 */
class CoroutineWebServer extends App
{

    /**
     * 每個(gè)連接的協(xié)程計(jì)數(shù)
     *
     * @var int[]
     */
    protected static array $_connectionCoroutineCount = [];

    /**
     * 獲取連接的協(xié)程計(jì)數(shù)
     *
     * @return int[]|int
     */
    public static function getConnectionCoroutineCount(?string $connectionId = null): array|int
    {
        return $connectionId === null
            ? static::$_connectionCoroutineCount
            : (static::$_connectionCoroutineCount[$connectionId] ?? 0);
    }

    /**
     * 回收連接的協(xié)程計(jì)數(shù)
     *
     * @param string $connectionId
     * @param bool $force
     * @return void
     */
    public static function unsetConnectionCoroutineCount(string $connectionId, bool $force = false): void
    {
        if (!$force and self::getConnectionCoroutineCount($connectionId) > 0) {
            return;
        }
        unset(static::$_connectionCoroutineCount[$connectionId]);
    }

    /** @inheritdoc  */
    public function onWorkerStart($worker)
    {
        if (!\config('plugin.workbunny.webman-coroutine.app.enable', false)) {
            return;
        }
        parent::onWorkerStart($worker);
        /** @var HandlerInterface $handler */
        $handler = Factory::getCurrentHandler();
        $handler::initEnv();
    }

    /**
     * 停止服務(wù)
     *
     *  - 不用返回值和參數(shù)標(biāo)定是為了兼容
     *
     * @param Worker|mixed $worker
     * @return void
     */
    public function onWorkerStop($worker, ...$params)
    {
        if (is_callable($call = [parent::class, 'onWorkerStop'])) {
            call_user_func($call, $worker, ...$params);
        }
    }

    /**
     * 連接建立
     *
     *  - 不用返回值和參數(shù)標(biāo)定是為了兼容
     *
     * @param ConnectionInterface $connection
     * @param mixed ...$params
     * @return void
     */
    public function onConnect($connection, ...$params): void
    {
        if (!is_object($connection)) {
            return;
        }
        if (is_callable($call = [parent::class, 'onConnect'])) {
            // 協(xié)程化創(chuàng)建連接
            new Coroutine(function () use ($call, $connection, $params) {
                call_user_func($call, $connection, ...$params);
            });
        }
    }

    /**
     * 連接關(guān)閉
     *
     *  - 不用返回值和參數(shù)標(biāo)定是為了兼容
     *
     * @param ConnectionInterface|mixed $connection
     * @param ...$params
     * @return void
     */
    public function onClose($connection, ...$params)
    {
        if (!is_object($connection)) {
            return;
        }
        if (is_callable($call = [parent::class, 'onClose'])) {
            // 協(xié)程化關(guān)閉連接
            new Coroutine(function () use ($call, $connection, $params) {
                call_user_func($call, $connection, ...$params);
            });
        }
        self::unsetConnectionCoroutineCount(spl_object_hash($connection), true);
    }

    /**
     * @link parent::onMessage()
     * @param ConnectionInterface|mixed $connection
     * @param Request|mixed $request
     * @param ...$params
     * @return null
     * @link parent::onMessage()
     */
    public function onMessage($connection, $request, ...$params)
    {
        if (!is_object($connection)) {
            return null;
        }
        $connectionId = spl_object_hash($connection);
        $params = func_get_args();
        $res = null;
        // 檢測協(xié)程數(shù)
        if (($consumerCount = \config('plugin.workbunny.webman-coroutine.app.consumer_count', 0)) > 0) {
            // 等待協(xié)程回收
            wait_for(function () use ($connectionId, $consumerCount) {
                return self::getConnectionCoroutineCount($connectionId) <= $consumerCount;
            });
        }

        $waitGroup = new WaitGroup();
        $waitGroup->add();
        // 請(qǐng)求消費(fèi)協(xié)程
        new Coroutine(function () use (&$res, $waitGroup, $params, $connectionId) {
            $res = parent::onMessage(...$params);
            // 計(jì)數(shù) --
            self::$_connectionCoroutineCount[$connectionId] --;
            // 嘗試回收
            self::unsetConnectionCoroutineCount($connectionId);
            // wg完成
            $waitGroup->done();
        });
        // 計(jì)數(shù) ++
        self::$_connectionCoroutineCount[$connectionId] =
            (isset(self::$_connectionCoroutineCount[$connectionId])
                ? self::$_connectionCoroutineCount[$connectionId] + 1
                : 1);
        // 等待
        $waitGroup->wait();
        return $res;
    }
}

CoroutineWebServer是繼承并代理了App的onMessage方法,將原本的方法執(zhí)行回調(diào)化,并且做到了非侵入onMessage的執(zhí)行邏輯,較為安全的支持了未來webman可能的升級(jí)改動(dòng)。

另外對(duì)于workerman 4.x下的event驅(qū)動(dòng)也做了兼容,除了增加了swow的事件驅(qū)動(dòng)外,還重新實(shí)現(xiàn)了swoole的事件驅(qū)動(dòng):

<?php
/**
 * @author workbunny/Chaz6chez
 * @email chaz6chez1993@outlook.com
 */

declare(strict_types=1);

namespace Workbunny\WebmanCoroutine\Events;

use Swoole\Coroutine;
use Swoole\Event;
use Swoole\Process;
use Swoole\Timer;
use Workbunny\WebmanCoroutine\Exceptions\EventLoopException;
use Workerman\Events\EventInterface;

class SwooleEvent implements EventInterface
{
    /** @var int[] All listeners for read event. */
    protected array $_reads = [];

    /** @var int[] All listeners for write event. */
    protected array $_writes = [];

    /** @var callable[] Event listeners of signal. */
    protected array $_signals = [];

    /** @var int[] Timer id to timer info. */
    protected array $_timer = [];

    /** @var int 定時(shí)器id */
    protected int $_timerId = 0;

    /**
     * @param bool $debug 測試用
     * @throws EventLoopException 如果沒有啟用拓展
     */
    public function __construct(bool $debug = false)
    {
        if (!$debug and !extension_loaded('swoole')) {
            throw new EventLoopException('Not support ext-swoole. ');
        }
    }

    /** @inheritdoc  */
    public function add($fd, $flag, $func, $args = [])
    {
        switch ($flag) {
            case EventInterface::EV_SIGNAL:
                if (!isset($this->_signals[$fd])) {
                    if ($res = Process::signal($fd, $func)) {
                        $this->_signals[$fd] = $func;
                    }

                    return $res;
                }

                return false;
            case EventInterface::EV_TIMER:
            case EventInterface::EV_TIMER_ONCE:
                $timerId = $this->_timerId++;
                $this->_timer[$timerId] = Timer::after((int) ($fd * 1000), function () use ($timerId, $flag, $func) {
                    call_user_func($func);
                    if ($flag === EventInterface::EV_TIMER_ONCE) {
                        $this->del($timerId, $flag);
                    }
                });

                return $timerId;
            case EventInterface::EV_READ:
                if (\is_resource($fd)) {
                    if ($this->_reads[$key = (int) $fd] ?? null) {
                        $this->del($fd, EventInterface::EV_READ);
                    }
                    if ($res = Event::add($fd, $func, null, SWOOLE_EVENT_READ)) {
                        $this->_reads[$key] = 1;
                    }

                    return (bool) $res;
                }

                return false;
            case self::EV_WRITE:
                if (\is_resource($fd)) {
                    if ($this->_writes[$key = (int) $fd] ?? null) {
                        $this->del($fd, EventInterface::EV_WRITE);
                    }
                    if ($res = Event::add($fd, $func, null, SWOOLE_EVENT_WRITE)) {
                        $this->_writes[$key] = 1;
                    }

                    return (bool) $res;
                }

                return false;
            default:
                return null;
        }
    }

    /** @inheritdoc  */
    public function del($fd, $flag)
    {
        switch ($flag) {
            case self::EV_SIGNAL:
                if ($this->_signals[$fd] ?? null) {
                    if (Process::signal($fd, null)) {
                        unset($this->_signals[$fd]);

                        return true;
                    }
                }

                return false;
            case self::EV_TIMER:
            case self::EV_TIMER_ONCE:
                if ($id = $this->_timer[$fd] ?? null) {
                    if (Timer::clear($id)) {
                        unset($this->_timer[$fd]);

                        return true;
                    }
                }

                return false;
            case self::EV_READ:
                if (
                    \is_resource($fd) and
                    isset($this->_reads[$key = (int) $fd]) and
                    Event::isset($fd, SWOOLE_EVENT_READ)
                ) {
                    if (Event::del($fd)) {
                        unset($this->_reads[$key]);

                        return true;
                    }
                }

                return false;
            case self::EV_WRITE:
                if (
                    \is_resource($fd) and
                    isset($this->_writes[$key = (int) $fd]) and
                    Event::isset($fd, SWOOLE_EVENT_WRITE)
                ) {
                    if (Event::del($fd)) {
                        unset($this->_writes[$key]);

                        return true;
                    }
                }

                return false;
            default:
                return null;
        }
    }

    /** @inheritdoc  */
    public function loop()
    {
        // 阻塞等待
        Event::wait();
        // 確定loop為退出狀態(tài)
        exit(0);
    }

    /** @inheritdoc  */
    public function destroy()
    {
        // 移除所有定時(shí)器
        $this->clearAllTimer();
        // 退出所有協(xié)程
        foreach (Coroutine::listCoroutines() as $coroutine) {
            Coroutine::cancel($coroutine);
        }
        // 退出event loop
        Event::exit();
        $this->_reads = $this->_writes = [];
    }

    /** @inheritdoc  */
    public function clearAllTimer()
    {
        foreach ($this->_timer as $id) {
            Timer::clear($id);
        }
        $this->_timer = [];
    }

    /** @inheritdoc  */
    public function getTimerCount()
    {
        return count($this->_timer);
    }
}

在測試workerman 5.x的過程中還找到了一些workerman的swoole驅(qū)動(dòng)的bug,我進(jìn)行了PR,積極參與維護(hù),fix: all coroutines must be canceled before Event::exit #1059

其他更多特性及功能請(qǐng)參考插件文檔,插件也支持純workerman開發(fā)環(huán)境,webman-coroutine文檔

一些經(jīng)驗(yàn)

1. 協(xié)程并不是銀彈,并不會(huì)讓一些原本耗費(fèi)時(shí)間的邏輯變短,它只是能合理的利用阻塞的間歇去處理其他的業(yè)務(wù),本質(zhì)上是用空間換時(shí)間

2. PHP的數(shù)組和對(duì)象是存放在堆中的數(shù)據(jù),其他如字符串、整數(shù)等是在棧上

  • 協(xié)程的切換中會(huì)自動(dòng)保存寄存器和棧信息,但不會(huì)保存堆數(shù)據(jù),這也就意味著堆數(shù)據(jù)會(huì)被多個(gè)協(xié)程操作,導(dǎo)致競爭狀態(tài)
$a = new \stdClass();
$a->id = 1;
new Coroutine(function () use ($a) {
    // 一些業(yè)務(wù)邏輯
    $a->id = 2;
})
new Coroutine(function () use ($a) {
    // 一些業(yè)務(wù)邏輯
    $a->id = 3;
})
// 等待所有協(xié)程結(jié)束

// 由于每個(gè)協(xié)程的邏輯中可能存在協(xié)程切換出讓,結(jié)合對(duì)象是堆數(shù)據(jù)且引用,最后的結(jié)果不能保證是1或者2或者3
// 數(shù)組同理
echo $a->id;
  • 對(duì)于保存在棧上的數(shù)據(jù)如果進(jìn)行引用操作,也會(huì)存在競爭狀態(tài)
$a = 1;
new Coroutine(function () use (&$a) {
    // 一些業(yè)務(wù)邏輯
    $a = 2;
})
new Coroutine(function () use (&$a) {
    // 一些業(yè)務(wù)邏輯
    $a = 3;
})
// 等待所有協(xié)程結(jié)束

// 由于每個(gè)協(xié)程的邏輯中可能存在協(xié)程切換出讓,變量是引用,最后的結(jié)果不能保證是1或者2或者3
echo $a;
  • 堆數(shù)據(jù)可以利用clone進(jìn)行拷貝操作,但資源類型不可以clone
  • 可以通過協(xié)程id + 靜態(tài)數(shù)組結(jié)合來保存和銷毀需要處理的競態(tài)數(shù)據(jù),從而實(shí)現(xiàn)協(xié)程上下文
static array $context = [];

$a = 1;

$id1 = new Coroutine(function () use (&$id1) {
    $contextA = self::$context[$id]
    // 一些業(yè)務(wù)邏輯
    self::$context[$id1] = 2;
})
self::$context[$id1] = $a;

$id2 = new Coroutine(function () use (&$id2) {
    $contextB = self::$context[$id]
    // 一些業(yè)務(wù)邏輯
    self::$context[$id2] = 3;
})
self::$context[$id1] = $a;

// 等待所有協(xié)程結(jié)束

// 這里會(huì)輸出1
echo $a;

// 讀取上下文內(nèi)容,獲取協(xié)程結(jié)果, 一般這里不推薦直接上下文讀取,而是通過CSP模型的channel進(jìn)行傳遞
// 還要注意上下文的回收,避免靜態(tài)數(shù)組膨脹
echo self::$context[$id1];
echo self::$context[$id2];

// 以上并不是完整的上下文實(shí)現(xiàn)方案,只是一個(gè)偽代碼??!

3. 關(guān)于數(shù)據(jù)庫連接池

  • 數(shù)據(jù)庫協(xié)議一般是支持雙工的,但PDO是標(biāo)準(zhǔn)的blocking-I/O實(shí)現(xiàn)
  • PDO在發(fā)送SQL后會(huì)阻塞等待SQL的執(zhí)行結(jié)果,swow和swoole在底層hook了阻塞等待的過程,進(jìn)行了協(xié)程切換

    以pdo的mysql舉例:

    // https://github.com/php/php-src/blob/master/ext/pdo_mysql/mysql_driver.c
    static zend_long mysql_handle_doer(pdo_dbh_t *dbh, const zend_string *sql)
    {
        pdo_mysql_db_handle *H = (pdo_mysql_db_handle *)dbh->driver_data;
        PDO_DBG_ENTER("mysql_handle_doer");
        PDO_DBG_INF_FMT("dbh=%p", dbh);
        PDO_DBG_INF_FMT("sql=%.*s", (int)ZSTR_LEN(sql), ZSTR_VAL(sql));
        if (mysql_real_query(H->server, ZSTR_VAL(sql), ZSTR_LEN(sql))) {
            pdo_mysql_error(dbh);
            PDO_DBG_RETURN(-1);
        } else {
            my_ulonglong c = mysql_affected_rows(H->server);
            if (c == (my_ulonglong) -1) {
                pdo_mysql_error(dbh);
                PDO_DBG_RETURN(H->einfo.errcode ? -1 : 0);
            } else {
                /* MULTI_QUERY support - eat up all unfetched result sets */
                MYSQL_RES* result;
                while (mysql_more_results(H->server)) {
                    if (mysql_next_result(H->server)) {
                        pdo_mysql_error(dbh);
                        PDO_DBG_RETURN(-1);
                    }
                    result = mysql_store_result(H->server);
                    if (result) {
                        mysql_free_result(result);
                    }
                }
                PDO_DBG_RETURN((int)c);
            }
        }
    }

    以上代碼可以簡單理解為以下偽代碼

    $requestId = $mysqlClient->send('SQL');
    while (1) {
        $res = $mysqlClient->get($requestId);
        if ($res) {
            return $res;
        }
        // 超時(shí)等其他機(jī)制
        // 協(xié)程sleep出讓
    }
  • 如果協(xié)程共用同一個(gè)連接,由于PDO的BIO實(shí)現(xiàn)方式,所以可能導(dǎo)致N次協(xié)程的請(qǐng)求都被$mysqlClient->send('SQL');由DB服務(wù)器接收并依次執(zhí)行(來源于同一個(gè)連接的多次SQL是順序執(zhí)行),但可能存在后者的協(xié)程結(jié)果喚起了前者協(xié)程的$res = $mysqlClient->get($requestId);,從而導(dǎo)致數(shù)據(jù)錯(cuò)亂;這里本質(zhì)上是因?yàn)镻DO對(duì)象是堆數(shù)據(jù),在多個(gè)協(xié)程中是競態(tài)的,為了避免這樣的情況,有以下方案解決:
    • 為每個(gè)協(xié)程創(chuàng)建連接 【不推薦】
    • 實(shí)現(xiàn)連接對(duì)象池(連接池本質(zhì)上可以簡單理解和實(shí)現(xiàn)為一個(gè)可以合理管理數(shù)據(jù)庫連接對(duì)象上下文的靜態(tài)數(shù)組)【需要一定開發(fā)能力】
    • 使用協(xié)程版的數(shù)據(jù)庫,如 hyperf/database、hyperf/db

4. 其他需要池化的組件

  • 本質(zhì)上和數(shù)據(jù)庫存在的問題一樣,是對(duì)象/數(shù)組這種堆數(shù)據(jù)的競態(tài)問題
  • 如果不在意返回結(jié)果,其實(shí)就不用在意上下文問題

5. 更多經(jīng)驗(yàn),持續(xù)更新

愿景

目前webman/workerman的協(xié)程實(shí)現(xiàn)僅僅只是入了個(gè)門,主要解決了阻塞退化問題,能夠簡單的實(shí)現(xiàn)以下場景:

  • 長輪詢接口
  • 非阻塞timer調(diào)度
  • 隊(duì)列生產(chǎn)消費(fèi)
  • worker/server協(xié)程化

但還有很多基建需要社區(qū)出謀出力添磚加瓦,比如:

  • 少侵入/非侵入的改造,讓webman數(shù)據(jù)庫連接池化
  • 少侵入/非侵入的改造,讓workerman的組件協(xié)程化
  • 少侵入/非侵入的改造,讓composer組件協(xié)程化

當(dāng)然,在此之前,你可以使用所有基于swow\swoole\ripple\revolt協(xié)程驅(qū)動(dòng)開發(fā)的協(xié)程版組件,但我希望未來可以整合這些協(xié)程實(shí)現(xiàn)的組件,能夠有一個(gè)統(tǒng)一的使用方式(雖然難度相當(dāng)大,但也想試試);

歡迎大佬們共建,issuePR
文章如有錯(cuò)誤,敬請(qǐng)指正。謝謝?。?/strong>


2024-10-22 更新

目前已經(jīng)實(shí)現(xiàn)了較為基礎(chǔ)的Utils\Pool工具,可用于對(duì)象池化的實(shí)現(xiàn)

  • Utils\Poo\Debugger可用于檢測待池化的對(duì)象是否存在非法和風(fēng)險(xiǎn),拋出的異常可以自行捕獲進(jìn)行日志監(jiān)控或者是調(diào)試,詳細(xì)參考測試用例
    • Debugger使用了WeakMap來對(duì)生命周期內(nèi)的檢測對(duì)象進(jìn)行緩存,避免重復(fù)檢查
    • Debugger使用了生成器進(jìn)行遞歸檢測,更少的內(nèi)存占用,避免深遞歸內(nèi)存溢出
    • 風(fēng)險(xiǎn)/非法信息
      • 靜態(tài)數(shù)組屬于風(fēng)險(xiǎn)項(xiàng)
      • 靜態(tài)對(duì)象屬于風(fēng)險(xiǎn)項(xiàng)
      • 資源類型屬于非法項(xiàng)
  • Utils\Poo\Pool可用于實(shí)現(xiàn)連接池、資源鎖等,具體可參考文檔

建議和意見都可以提交issue

歡迎各位大佬的PR!

4152 28 24
28個(gè)評(píng)論

shanjian

大佬厲害

  • 暫無評(píng)論
guanhui07

good job

  • 暫無評(píng)論
農(nóng)民工

學(xué)習(xí)了

  • 暫無評(píng)論
smile1

前排

  • 暫無評(píng)論
walkor

  • 暫無評(píng)論
yin5th

感謝大佬分享?。?!

  • 暫無評(píng)論
初心by

兔神牛皮

  • 暫無評(píng)論
Tinywan

大佬厲害,666

  • 暫無評(píng)論
xiaoming

協(xié)程版的數(shù)據(jù)庫 本質(zhì)實(shí)現(xiàn)是 連接對(duì)象池嗎

  • chaz6chez 2024-10-08

    PDO的對(duì)象與數(shù)據(jù)庫客戶端連接一一對(duì)應(yīng),連接對(duì)象池打破了單例連接的這種做法,所以相同的數(shù)據(jù)庫會(huì)存在多個(gè)客戶端連接,這個(gè)對(duì)象池主要是為了合理的去管理上下文問題

深路瀟湘

看不懂,但依然給你點(diǎn)贊

  • chaz6chez 2024-10-08

    有不懂的地方可以在這里提問,我盡我所能解答疑問

10bang

  • 暫無評(píng)論
深藍(lán)

大佬玩底層,我們只能摸摸大佬的風(fēng),也想PR出點(diǎn)力,奈何能力不夠。

  • chaz6chez 2024-10-08

    使用過程中有任何需要實(shí)現(xiàn)的特性或者找到的bug??也可以積極提issue呀,代碼的注釋很齊全,也可以看看源碼,總有機(jī)會(huì)提pr的,加油!

wocall

大師??

  • 暫無評(píng)論
Jinson

666

  • 暫無評(píng)論
Gin

贊贊贊

  • 暫無評(píng)論
qq7467466

兔子大佬太強(qiáng)了

  • 暫無評(píng)論
tanhongbin

大佬就是大佬,底層都能改,我看底層代碼都費(fèi)勁,看5分鐘就能睡著

  • 暫無評(píng)論
xiaopi

老哥依舊穩(wěn)定輸出

  • 暫無評(píng)論
tang23

先贊后看

  • 暫無評(píng)論
晚安。

贊??

  • 暫無評(píng)論
liudada1204

先贊后看??

  • wocall 2024-10-11

    發(fā)現(xiàn)沒有點(diǎn)贊的地方

wolfcode

好文~
????????????
????????????
????????????
????????????
????????????
????????????

  • 暫無評(píng)論
ikun

如此好的文章 應(yīng)該置頂推薦

  • 暫無評(píng)論
皮皮俠

牛皮

  • 暫無評(píng)論
皮皮俠

看起來有點(diǎn)迷糊,那么現(xiàn)在的情況是由于workerman的數(shù)據(jù)庫不支持連接池,所以使用這個(gè)協(xié)程組件的情況下不能進(jìn)行數(shù)據(jù)庫操作嗎

  • Tinywan 2024-10-19

    數(shù)據(jù)庫連接池暫不支持

  • chaz6chez 2024-10-19

    插件內(nèi)有Pool工具,通過Pool自行封裝數(shù)據(jù)庫或者連接相關(guān)的工具就可以了,暫時(shí)還不能直接使用webman/workerman的數(shù)據(jù)庫插件

pengzhen

大佬,請(qǐng)問這個(gè)插件有生產(chǎn)環(huán)境使用嗎?我目前正在為公司項(xiàng)目尋找框架,選定webman+gatewaywork,但是苦于沒有協(xié)程,看到你這個(gè)插件,想用,又擔(dān)心有坑

  • chaz6chez 2024-10-25

    群里有大佬已經(jīng)準(zhǔn)備上生產(chǎn)了,我自己的項(xiàng)目目前還在測試環(huán)境沒有上生產(chǎn),這個(gè)項(xiàng)目目前是重心開發(fā)的,遇到任何問題都可以咨詢

  • pengzhen 2024-10-25

    有群嗎?

  • chaz6chez 2024-10-25

    webman微信群

  • pengzhen 2024-10-25

    幾群?

  • chaz6chez 2024-10-25

    我都在

pengzhen

大佬,讀了下文檔,有以下幾個(gè)問題不懂

1、webman 的webserver 要同時(shí)開啟 CoroutineWebServer和webman自帶的server嗎?那樣豈不是對(duì)外暴露了2個(gè)端口,webman自帶的可以去掉嗎?
2、操作數(shù)據(jù)庫可以用hypderf/database 解決連接池問題嗎?
3、上下文,用webman自帶的context類保存包括對(duì)象,數(shù)組這些數(shù)據(jù)類型可以嗎?還是必須要用協(xié)程id作為key區(qū)別開來

  • chaz6chez 2024-10-25

    1.可以關(guān)閉webman自帶的server
    2.使用swoole驅(qū)動(dòng)可以使用hyperf的db,但需要自己引入框架
    3.一般使用無需關(guān)注上下文,跨協(xié)程處理數(shù)據(jù)建議使用channel

  • pengzhen 2024-10-25

    我想用到的上下文其實(shí)就是想保存當(dāng)前這個(gè)請(qǐng)求的全局變量,類似fpm下的靜態(tài)屬性,處理完這個(gè)請(qǐng)求就銷毀的

  • chaz6chez 2024-10-25

    你思考的稍微簡單了一些

  • pengzhen 2024-10-25

    怎么說?目前項(xiàng)目中需要的就是請(qǐng)求級(jí)別的變量,暫時(shí)沒有跨協(xié)程的

  • chaz6chez 2024-10-25

    請(qǐng)求+容器就跨協(xié)程啊,我建議如果想要上生產(chǎn),自身對(duì)協(xié)程沒有那么熟悉的話,直接用hyperf全家套

  • xiaopi 2024-10-26

    3.我理解的實(shí)際上swoole的協(xié)程都在同一個(gè)進(jìn)程空間,可以共同使用進(jìn)程內(nèi)存資源,所以多個(gè)協(xié)程利用同一個(gè)數(shù)組保存數(shù)據(jù)是ok的,而webman自帶的Context類就是根據(jù)協(xié)程id作為數(shù)組key,區(qū)分不同的協(xié)程資源的,適用的場景就是不需要協(xié)程間協(xié)作的場景;
    而使用swoole的channel適用于需要協(xié)程間協(xié)作的場景,比如,一次請(qǐng)求下要記錄到數(shù)據(jù)庫、記錄日志,并將記錄的結(jié)果響應(yīng)給調(diào)用方,這種情況下解決方案就是:
    1.父協(xié)程創(chuàng)建協(xié)程A和協(xié)程B分別處理這兩件事
    2.父協(xié)程阻塞當(dāng)前協(xié)程,等待協(xié)程A和協(xié)程B的執(zhí)行結(jié)果
    3.父協(xié)程得到了結(jié)果,并響應(yīng)給調(diào)用放。

    上述的情況使用共享變量的情況下就很難處理,想要阻塞父協(xié)程,不使用內(nèi)置的waitGroup的情況下,就必須在父協(xié)程寫個(gè)循環(huán)體,并且為了不阻塞進(jìn)程,還需要寫個(gè)IO,即sleep ,while(true){ sleep(1)}
    而使用channel就很方便,直接在父協(xié)程pop兩次,協(xié)程A完成后push(),協(xié)程B完成后push(),偽代碼:

    $chan = new Channel();
    go(function(){
    //
    $chan->push()
    })
    go(function(){
    //
    $chan->push()
    })

    $chan->pop()
    $chan->pop()

    return response("ok");

  • chaz6chez 2024-10-29

    workerman如果使用協(xié)程,協(xié)程也在一個(gè)進(jìn)程空間,數(shù)據(jù)引用在協(xié)程之間是不隔離的,比如&的數(shù)據(jù),比如對(duì)象,比如資源類型,比如數(shù)組,只要是引用類型的數(shù)據(jù)都存在協(xié)程間的競爭狀態(tài),這樣的競爭狀態(tài)會(huì)導(dǎo)致數(shù)據(jù)可能被污染,為了達(dá)到數(shù)據(jù)不被污染的效果,除了對(duì)數(shù)據(jù)加鎖外,還可以通過channel進(jìn)行有序傳遞,也就是當(dāng)一個(gè)數(shù)據(jù)正在被一個(gè)協(xié)程消費(fèi)的時(shí)候,其他的協(xié)程是沒有從通道內(nèi)獲取到數(shù)據(jù)的,直到獲取到數(shù)據(jù)的協(xié)程消費(fèi)完畢,變相的其實(shí)也是一種鎖的機(jī)制,在pop不到數(shù)據(jù)的時(shí)候協(xié)程會(huì)自動(dòng)出讓控制權(quán);不論是上下文還是通道還是sync鎖,都是一種競爭數(shù)據(jù)的并發(fā)安全操作。

德瑪西亞

先贊后看,養(yǎng)成習(xí)慣。

  • 暫無評(píng)論

chaz6chez

5174
積分
0
獲贊數(shù)
0
粉絲數(shù)
2018-11-16 加入
×
??