国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

分享一個使用php發(fā)起websocket的ws協(xié)議鏈接的類

小七他哥

不多說廢話,上代碼:

<?php

class BadUriException extends Exception {
}

class ServerConnectException extends Exception {
}

class HandshakeException extends Exception {
}

class BadFrameException extends Exception {
}

class SocketRWException extends Exception {
}

class WebSocketClient {

    const PROTOCOL_WS = 'ws';
    const PROTOCOL_WSS = 'wss';

    const HTTP_HEADER_SEPARATION_MARK = "\r\n";
    const HTTP_HEADER_END_MARK = "\r\n\r\n";

    const UUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11';

    const PACKET_SIZE = (1 << 15);

    // 還有后續(xù)幀
    const OPCODE_CONTINUATION_FRAME = 0;
    // 文本幀
    const OPCODE_TEXT_FRAME = 1;
    // 二進制幀
    const OPCODE_BINARY_FRAME = 2;
    // 關(guān)閉連接
    const OPCODE_CLOSE = 8;
    // ping
    const OPCODE_PING = 9;
    // pong
    const OPCODE_PONG = 10;

    const FRAME_LENGTH_LEVEL_1_MAX = 125;
    const FRAME_LENGTH_LEVEL_2_MAX = 65535;

    private $protocol;

    private $host;

    private $port;

    private $path;

    private $sock;

    private $secWebSocketKey;

    private $handshakePass = false;

    private $readBuf;

    private $currentReadBufLen;

    private $readBufPos;

    private $closed = false;

    /**
     * WebSocketClient constructor.
     * @param string $wsUri
     * @param float $connectTimeout 設(shè)置連接服務(wù)器的超時時間
     * @param float $rwTimeout 設(shè)置讀寫數(shù)據(jù)的超時時間
     * @throws Exception
     */
    public function __construct($wsUri, $connectTimeout = 1.0, $rwTimeout = 5.0) {
        $this->parseUri($wsUri);

        $this->connect($connectTimeout, $rwTimeout);

        $this->handshake();

        $this->initReadBuf();
    }

    /**
     * 解析websocket連接地址
     * @param $wsUri
     * @throws BadUriException
     */
    protected function parseUri($wsUri) {
        $uriData = parse_url($wsUri);
        if (!$uriData) {
            throw new BadUriException('不正確的ws uri格式', __LINE__);
        }
        if ($uriData['scheme'] != self::PROTOCOL_WS && $uriData['scheme'] != self::PROTOCOL_WSS) {
            throw new BadUriException('ws的uri必須是以ws://或wss://開頭', __LINE__);
        }
        $this->protocol = $uriData['scheme'];
        $this->host = $uriData['host'];

        if ($uriData['port']) {
            $this->port = (int)$uriData['port'];
        } else {
            if ($this->protocol == self::PROTOCOL_WSS) {
                $this->port = 443;
            } else {
                $this->port = 80;
            }
        }
        $this->path = !empty($uriData['path']) ?: '/';
        if (!empty($uriData['query'])) {
            $this->path .= '?' . $uriData['query'];
        }
        if (!empty($uriData['fragment'])) {
            $this->path .= '#' . $uriData['fragment'];
        }
    }

    /**
     * 連接websocket服務(wù)器
     * @param float $timeout 連接服務(wù)器的超時時間
     * @param float $rwTimeout 設(shè)置讀寫數(shù)據(jù)的超時時間
     * @throws ServerConnectException
     */
    protected function connect($timeout, $rwTimeout) {
        $this->sock = stream_socket_client(
            ($this->protocol == self::PROTOCOL_WSS ? 'ssl://' : 'tcp://') . $this->host . ':' . $this->port,
            $errno,
            $errstr,
            $timeout
        );

        if (!$this->sock) {

            if ($errstr) {
                throw new ServerConnectException('連接ws服務(wù)器失?。? . $errstr, $errno);
            }

            throw new ServerConnectException('連接ws服務(wù)器失敗: 未知錯誤', __LINE__);
        }

        $this->setSockTimeout($rwTimeout);
    }

    /**
     * 設(shè)置socket的讀寫超時時間
     * @param float $seconds
     */
    public function setSockTimeout($seconds) {
        if (strpos($seconds, '.') !== false) {
            $original = $seconds;
            $seconds = (int)$seconds;
            $microseconds = bcmul($original, 1000000, 0) - ($seconds * 1000000);
        } else {
            $microseconds = 0;
        }
        stream_set_timeout($this->sock, (int)$seconds, $microseconds);
    }

    /**
     * @param $data
     * @throws SocketRWException
     */
    protected function writeToSock($data) {
        if ($this->closed) {
            throw new SocketRWException('連接已關(guān)閉, 不允許再發(fā)送消息', __LINE__);
        }

        $dataLen = strlen($data);
        if ($dataLen > self::PACKET_SIZE) {
            $dataPieces = str_split($data, self::PACKET_SIZE);
            foreach ($dataPieces as $piece) {
                $this->writeN($piece);
            }
        } else {
            $this->writeN($data);
        }
    }

    /**
     * 向socket寫入N個字節(jié)
     * @param $str
     * @throws SocketRWException
     */
    protected function writeN($str) {
        if ($this->closed) {
            throw new SocketRWException('連接已關(guān)閉, 不允許再發(fā)送消息', __LINE__);
        }

        $len = strlen($str);
        $writeLen = 0;
        do {
            if ($writeLen > 0) {
                $str = substr($str, $writeLen);
            }
            $n = fwrite($this->sock, $str);
            if ($n === false) {
                $meta = stream_get_meta_data($this->sock);
                if ($meta['timed_out']) {
                    throw new SocketRWException('向服務(wù)器發(fā)送數(shù)據(jù)超時', __LINE__);
                }
                throw new SocketRWException('無法發(fā)送數(shù)據(jù),socket連接已斷開?', __LINE__);
            }
            $writeLen += $n;
        } while ($writeLen < $len);
    }

    /**
     * 隨機產(chǎn)生一個 Sec-WebSocket-Key
     * @return false|string
     */
    protected static function generateWsKey() {
        return base64_encode(md5(uniqid() . mt_rand(1, 8192), true));
    }

    /**
     * websocket握手
     * @throws Exception
     */
    protected function handshake() {
        $this->secWebSocketKey = self::generateWsKey();
        $headers = [
            'GET ' . $this->path . ' HTTP/1.1',
            'Host: ' . $this->host . ':' . $this->port,
            'Upgrade: websocket',
            'Connection: Upgrade',
            'Sec-WebSocket-Key: ' . $this->secWebSocketKey,
            'Sec-WebSocket-Version: 13',
        ];
        $htmlHeader = implode(self::HTTP_HEADER_SEPARATION_MARK, $headers) . self::HTTP_HEADER_END_MARK;
        $this->writeToSock($htmlHeader);
        $response = '';
        $end = false;
        do {
            $str = fread($this->sock, 8192);
            if (strlen($str) == 0) {
                break;
            }
            $response .= $str;
            $end = strpos($response, self::HTTP_HEADER_END_MARK);
        } while ($end === false);

        if ($end === false) {
            throw new HandshakeException('握手失?。何帐猪憫?yīng)不是標(biāo)準(zhǔn)的http響應(yīng)', __LINE__);
        }

        $resHeader = substr($response, 0, $end);
        $headers = explode(self::HTTP_HEADER_SEPARATION_MARK, $resHeader);

        if (strpos($headers[0], '101') === false) {
            throw new HandshakeException('握手失?。悍?wù)器返回http狀態(tài)碼不是101', __LINE__);
        }
        for ($i = 1; $i < count($headers); $i++) {
            list($key, $val) = explode(':', $headers[$i]);
            if (strtolower(trim($key)) == 'sec-websocket-accept') {
                $accept = base64_encode(sha1($this->secWebSocketKey . self::UUID, true));
                if (trim($val) != $accept) {
                    throw new HandshakeException('握手失?。?sec-websocket-accept值校驗失敗', __LINE__);
                }
                $this->handshakePass = true;
                break;
            }
        }
        if (!$this->handshakePass) {
            throw new HandshakeException('握手失敗:缺少sec-websocket-accept http頭', __LINE__);
        }
    }

    /**
     * @param int $opCode 幀類型
     * @param string $playLoad 攜帶的數(shù)據(jù)
     * @param bool $isMask 是否使用掩碼
     * @param int $status 關(guān)閉幀狀態(tài)
     * @return false|string
     */
    protected function packFrame($opCode, $playLoad = '', $isMask = true, $status = 1000) {
        $firstByte = 0x80 | $opCode;
        if ($isMask) {
            $secondByte = 0x80;
        } else {
            $secondByte = 0x00;
        }

        $playLoadLen = strlen($playLoad);
        if ($opCode == self::OPCODE_CLOSE) {
            // 協(xié)議規(guī)定關(guān)閉幀必須使用掩碼
            $isMask = true;
            $playLoad = pack('CC', (($status >> 8) & 0xff), $status & 0xff) . $playLoad;
            $playLoadLen += 2;
        }
        if ($playLoadLen <= self::FRAME_LENGTH_LEVEL_1_MAX) {
            $secondByte |= $playLoadLen;
            $frame = pack('CC', $firstByte, $secondByte);
        } elseif ($playLoadLen <= self::FRAME_LENGTH_LEVEL_2_MAX) {
            $secondByte |= 126;
            $frame = pack('CCn', $firstByte, $secondByte, $playLoadLen);
        } else {
            $secondByte |= 127;
            $frame = pack('CCJ', $firstByte, $secondByte, $playLoadLen);
        }

        if ($isMask) {
            $maskBytes = [mt_rand(1, 255), mt_rand(1, 255), mt_rand(1, 255), mt_rand(1, 255)];
            $frame .= pack('CCCC', $maskBytes[0], $maskBytes[1], $maskBytes[2], $maskBytes[3]);
            if ($playLoadLen > 0) {
                for ($i = 0; $i < $playLoadLen; $i++) {
                    $playLoad[$i] = chr(ord($playLoad[$i]) ^ $maskBytes[$i % 4]);
                }
            }
        }

        $frame .= $playLoad;

        return $frame;
    }

    /**
     * ping服務(wù)器
     * @throws Exception
     */
    public function ping() {
        try {
            $frame = $this->packFrame(self::OPCODE_PING, '', true);
            $this->writeToSock($frame);
            do {
                $pong = $this->recv();
                if ($pong->opcode == self::OPCODE_PONG) {
                    return true;
                }
            } while ($pong->opcode != self::OPCODE_PONG);
            return false;
        } catch (Exception $e) {
            return false;
        }
    }

    /**
     * 響應(yīng)服務(wù)器的ping
     * @throws Exception
     */
    public function pong() {
        $frame = $this->packFrame(self::OPCODE_PONG, '', true);
        $this->writeToSock($frame);
    }

    /**
     * 主動關(guān)閉與服務(wù)器的連接
     * @return bool
     * @throws Exception
     */
    public function close() {
        $frame = $this->packFrame(self::OPCODE_CLOSE, '', true, 1000);

        try {
            $this->writeToSock($frame);
            // 主動關(guān)閉需要再接收一次對端返回的確認(rèn)消息
            $wsData = $this->recv();
            if ($wsData->opcode == self::OPCODE_CLOSE) {
                return true;
            }
        } catch (\Throwable $e) {
        } finally {
            $this->closed = true;
            stream_socket_shutdown($this->sock, STREAM_SHUT_RDWR);
        }
        return false;
    }

    /**
     * ping服務(wù)器失敗或服務(wù)器響應(yīng)異常時調(diào)用,用于關(guān)閉socket資源
     */
    public function abnormalClose() {
        if (!$this->closed && $this->sock) {
            $this->closed = true;
            try {
                stream_socket_shutdown($this->sock, STREAM_SHUT_RDWR);
            } catch (\Throwable $e) {
            }
        }
    }

    /**
     * 響應(yīng)服務(wù)器的關(guān)閉消息
     * @throws SocketRWException
     */
    protected function replyClosure() {
        $frame = $this->packFrame(self::OPCODE_CLOSE, '', true, 1000);
        $this->writeToSock($frame);
        $this->closed = true;
        stream_socket_shutdown($this->sock, STREAM_SHUT_RDWR);
    }

    /**
     * @param string $data 要發(fā)送的數(shù)據(jù)
     * @param int $opCode 發(fā)送的數(shù)據(jù)類型 WebSocketClient::OPCODE_TEXT_FRAME 或 WebSocketClient::OPCODE_BINARY_FRAME
     * @param bool $isMask 是否使用掩碼,默認(rèn)使用
     * @throws Exception
     */
    public function send($data, $opCode = self::OPCODE_TEXT_FRAME, $isMask = true) {
        if ($opCode != self::OPCODE_TEXT_FRAME && $opCode != self::OPCODE_BINARY_FRAME) {
            throw new \InvalidArgumentException('不支持的幀數(shù)據(jù)類型', __LINE__);
        }

        $frame = $this->packFrame($opCode, $data, $isMask);

        $this->writeToSock($frame);
    }

    /**
     * 初始化收取數(shù)據(jù)緩沖區(qū)
     */
    private function initReadBuf() {
        $this->readBuf = '';
        $this->currentReadBufLen = 0;
        $this->readBufPos = 0;
    }

    /**
     * 從讀取緩沖區(qū)中當(dāng)前位置返回指定長度字符串
     * @param int $len 返回長度
     * @return bool|string
     * @throws SocketRWException
     */
    private function fetchStrFromReadBuf($len = 1) {
        $target = $this->readBufPos + $len;

        while ($target > $this->currentReadBufLen) {
            if ($this->closed) {
                throw new SocketRWException('連接已關(guān)閉, 不允許再收取消息', __LINE__);
            }
            $read = fread($this->sock, self::PACKET_SIZE);
            if (!$read) {
                $meta = stream_get_meta_data($this->sock);
                if ($meta['timed_out']) {
                    throw new SocketRWException('讀取服務(wù)器數(shù)據(jù)超時', __LINE__);
                }
                throw new SocketRWException('無法讀取服務(wù)器數(shù)據(jù),錯誤未知', __LINE__);
            }
            $this->readBuf .= $read;
            $this->currentReadBufLen += strlen($read);
        }
        $str = substr($this->readBuf, $this->readBufPos, $len);
        $this->readBufPos += $len;
        return $str;
    }

    /**
     * 返回讀取緩沖區(qū)當(dāng)前位置字符的ascii碼
     * @return int
     * @throws SocketRWException
     */
    private function fetchCharFromReadBuf() {
        $str = $this->FetchStrFromReadBuf(1);
        return ord($str[0]);
    }

    /**
     * 丟棄讀取緩沖區(qū)已處理的指定長度數(shù)據(jù)
     * @param $len
     */
    private function discardReadBuf($len) {
        // 未處理的數(shù)據(jù)不會被丟棄
        if ($len > $this->readBufPos) {
            $len = $this->readBufPos;
        }
        if ($len > 0) {
            $this->readBuf = substr($this->readBuf, $len);
            $this->readBufPos -= $len;
            $this->currentReadBufLen -= $len;
        }
    }

    /**
     * @return WsDataFrame
     * @throws Exception
     */
    public function recv() {
        $dataFrame = $this->readFrame();
        switch ($dataFrame->opcode) {

            case self::OPCODE_PING:
                $this->pong();
                break;

            case self::OPCODE_PONG:
                break;

            case self::OPCODE_TEXT_FRAME:
            case self::OPCODE_BINARY_FRAME:
            case self::OPCODE_CLOSE:
                if ($dataFrame->fin == 0) {
                    do {
                        $continueFrame = $this->readFrame();
                        $dataFrame->playload .= $continueFrame->playload;
                    } while ($continueFrame->fin == 0);
                }

                if ($dataFrame->opcode == self::OPCODE_CLOSE) {
                    $this->replyClosure();
                }
                break;
            default:
                throw new BadFrameException('無法識別的frame數(shù)據(jù)', __LINE__);
                break;
        }
        return $dataFrame;
    }

    /**
     * 讀取一個數(shù)據(jù)幀
     * @return WsDataFrame
     * @throws SocketRWException
     */
    protected function readFrame() {
        $firstByte = $this->fetchCharFromReadBuf();
        $fin = ($firstByte >> 7);
        $opcode = $firstByte & 0x0F;
        $secondByte = $this->fetchCharFromReadBuf();
        $isMasked = ($secondByte >> 7);
        $dataLen = $secondByte & 0x7F;
        if ($dataLen == 126) {
            // 2字節(jié)無符號整形
            $dataLen = ($this->fetchCharFromReadBuf() << 8) + $this->fetchCharFromReadBuf();
        } elseif ($dataLen == 127) {
            // 8字節(jié)無符號整形
            $dataLen = $this->fetchStrFromReadBuf(8);
            $res = unpack('Jlen', $dataLen);
            if (isset($res['len'])) {
                $dataLen = $res['len'];
            } else {
                $dataLen = (ord($dataLen[0]) << 56)
                    + (ord($dataLen[1]) << 48)
                    + (ord($dataLen[2]) << 40)
                    + (ord($dataLen[3]) << 32)
                    + (ord($dataLen[4]) << 24)
                    + (ord($dataLen[5]) << 16)
                    + (ord($dataLen[6]) << 8)
                    + ord($dataLen[7]);
            }
        }

        $data = '';
        $status = 0;
        if ($dataLen > 0) {
            if ($isMasked) {
                // 4字節(jié)掩碼
                $maskChars = $this->fetchStrFromReadBuf(4);
                $maskSet = [ord($maskChars[0]), ord($maskChars[1]), ord($maskChars[2]), ord($maskChars[3])];
                $data = $this->fetchStrFromReadBuf($dataLen);
                for ($i = 0; $i < $dataLen; $i++) {
                    $data[$i] = chr(ord($data[$i]) ^ $maskSet[$i % 4]);
                }
            } else {
                $data = $this->fetchStrFromReadBuf($dataLen);
            }
            if ($opcode == self::OPCODE_CLOSE) {
                $status = (ord($data[0]) << 8) + ord($data[1]);
                $data = substr($data, 2);
            }
        }

        $this->discardReadBuf($this->readBufPos);

        $dataFrame = new WsDataFrame();
        $dataFrame->opcode = $opcode;
        $dataFrame->fin = $fin;
        $dataFrame->status = $status;
        $dataFrame->playload = $data;
        return $dataFrame;
    }

    /**
     * __destruct
     */
    public function __destruct() {
        $this->abnormalClose();
    }
}

/**
 * websocket數(shù)據(jù)幀
 * Class wsDataFrame
 * @package library\util
 */
class WsDataFrame {
    /**
     * @var int $opcode
     */
    public $opcode;

    /**
     * @var int $fin 標(biāo)識數(shù)據(jù)包是否已結(jié)束
     */
    public $fin;

    /**
     * @var int $status 關(guān)閉時的狀態(tài)碼,如果有的話
     */
    public $status;

    /**
     * @var string 數(shù)據(jù)包攜帶的數(shù)據(jù)
     */
    public $playload;
}

下面看看調(diào)用代碼:

try {
        $ws = new WebSocketClient('ws://' . $GLOBALS['ws_server_config']['host'] . ':' . $GLOBALS['ws_server_config']['port']);
        $ws->send($data);
        $frame = $ws->recv();
        //echo "收到服務(wù)器響應(yīng)數(shù)據(jù):" . $frame->playload . PHP_EOL;
        $ws->close();
    } catch (\Exception $e) {
        //echo "錯誤: ";
        //var_dump($e->__toString());
    }

這個其實也是我在網(wǎng)上找的代碼,個人感覺比 workerman 官方的那個要簡潔一些。

官方的方案是:http://doc.workerman.net/faq/as-wss-client.html

感覺這個好復(fù)雜,我要發(fā)起一個ws協(xié)議鏈接,還得起一個worker?感覺動作太大了。

我還是個新手,歡迎大家噴一下,我這個有問題沒,謝謝。

4104 3 0
3個評論

小七他哥

@walkor 官方能出一個類似這樣的簡單一點的類嗎?直接

$client = new WebsocketClient($link);
$client->send('hello');
$client->close();

這樣的。

  • 暫無評論
nitron

ws的就是為了長連接.你回復(fù)里這種連接后發(fā)送消息后就關(guān)閉的的場景,跟普通http請求的差異在哪里呢

  • 暫無評論
小七他哥

我要鏈接ws服務(wù)啊,沒法用一個http請求發(fā)到websocket服務(wù)上去吧?

  • 暫無評論
年代過于久遠,無法發(fā)表評論

小七他哥

60
積分
0
獲贊數(shù)
0
粉絲數(shù)
2021-06-23 加入
??