不多說廢話,上代碼:
<?php
class BadUriException extends Exception {
}
class ServerConnectException extends Exception {
}
class HandshakeException extends Exception {
}
class BadFrameException extends Exception {
}
class SocketRWException extends Exception {
}
class WebSocketClient {
const PROTOCOL_WS = 'ws';
const PROTOCOL_WSS = 'wss';
const HTTP_HEADER_SEPARATION_MARK = "\r\n";
const HTTP_HEADER_END_MARK = "\r\n\r\n";
const UUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11';
const PACKET_SIZE = (1 << 15);
// 還有后續(xù)幀
const OPCODE_CONTINUATION_FRAME = 0;
// 文本幀
const OPCODE_TEXT_FRAME = 1;
// 二進制幀
const OPCODE_BINARY_FRAME = 2;
// 關(guān)閉連接
const OPCODE_CLOSE = 8;
// ping
const OPCODE_PING = 9;
// pong
const OPCODE_PONG = 10;
const FRAME_LENGTH_LEVEL_1_MAX = 125;
const FRAME_LENGTH_LEVEL_2_MAX = 65535;
private $protocol;
private $host;
private $port;
private $path;
private $sock;
private $secWebSocketKey;
private $handshakePass = false;
private $readBuf;
private $currentReadBufLen;
private $readBufPos;
private $closed = false;
/**
* WebSocketClient constructor.
* @param string $wsUri
* @param float $connectTimeout 設(shè)置連接服務(wù)器的超時時間
* @param float $rwTimeout 設(shè)置讀寫數(shù)據(jù)的超時時間
* @throws Exception
*/
public function __construct($wsUri, $connectTimeout = 1.0, $rwTimeout = 5.0) {
$this->parseUri($wsUri);
$this->connect($connectTimeout, $rwTimeout);
$this->handshake();
$this->initReadBuf();
}
/**
* 解析websocket連接地址
* @param $wsUri
* @throws BadUriException
*/
protected function parseUri($wsUri) {
$uriData = parse_url($wsUri);
if (!$uriData) {
throw new BadUriException('不正確的ws uri格式', __LINE__);
}
if ($uriData['scheme'] != self::PROTOCOL_WS && $uriData['scheme'] != self::PROTOCOL_WSS) {
throw new BadUriException('ws的uri必須是以ws://或wss://開頭', __LINE__);
}
$this->protocol = $uriData['scheme'];
$this->host = $uriData['host'];
if ($uriData['port']) {
$this->port = (int)$uriData['port'];
} else {
if ($this->protocol == self::PROTOCOL_WSS) {
$this->port = 443;
} else {
$this->port = 80;
}
}
$this->path = !empty($uriData['path']) ?: '/';
if (!empty($uriData['query'])) {
$this->path .= '?' . $uriData['query'];
}
if (!empty($uriData['fragment'])) {
$this->path .= '#' . $uriData['fragment'];
}
}
/**
* 連接websocket服務(wù)器
* @param float $timeout 連接服務(wù)器的超時時間
* @param float $rwTimeout 設(shè)置讀寫數(shù)據(jù)的超時時間
* @throws ServerConnectException
*/
protected function connect($timeout, $rwTimeout) {
$this->sock = stream_socket_client(
($this->protocol == self::PROTOCOL_WSS ? 'ssl://' : 'tcp://') . $this->host . ':' . $this->port,
$errno,
$errstr,
$timeout
);
if (!$this->sock) {
if ($errstr) {
throw new ServerConnectException('連接ws服務(wù)器失?。? . $errstr, $errno);
}
throw new ServerConnectException('連接ws服務(wù)器失敗: 未知錯誤', __LINE__);
}
$this->setSockTimeout($rwTimeout);
}
/**
* 設(shè)置socket的讀寫超時時間
* @param float $seconds
*/
public function setSockTimeout($seconds) {
if (strpos($seconds, '.') !== false) {
$original = $seconds;
$seconds = (int)$seconds;
$microseconds = bcmul($original, 1000000, 0) - ($seconds * 1000000);
} else {
$microseconds = 0;
}
stream_set_timeout($this->sock, (int)$seconds, $microseconds);
}
/**
* @param $data
* @throws SocketRWException
*/
protected function writeToSock($data) {
if ($this->closed) {
throw new SocketRWException('連接已關(guān)閉, 不允許再發(fā)送消息', __LINE__);
}
$dataLen = strlen($data);
if ($dataLen > self::PACKET_SIZE) {
$dataPieces = str_split($data, self::PACKET_SIZE);
foreach ($dataPieces as $piece) {
$this->writeN($piece);
}
} else {
$this->writeN($data);
}
}
/**
* 向socket寫入N個字節(jié)
* @param $str
* @throws SocketRWException
*/
protected function writeN($str) {
if ($this->closed) {
throw new SocketRWException('連接已關(guān)閉, 不允許再發(fā)送消息', __LINE__);
}
$len = strlen($str);
$writeLen = 0;
do {
if ($writeLen > 0) {
$str = substr($str, $writeLen);
}
$n = fwrite($this->sock, $str);
if ($n === false) {
$meta = stream_get_meta_data($this->sock);
if ($meta['timed_out']) {
throw new SocketRWException('向服務(wù)器發(fā)送數(shù)據(jù)超時', __LINE__);
}
throw new SocketRWException('無法發(fā)送數(shù)據(jù),socket連接已斷開?', __LINE__);
}
$writeLen += $n;
} while ($writeLen < $len);
}
/**
* 隨機產(chǎn)生一個 Sec-WebSocket-Key
* @return false|string
*/
protected static function generateWsKey() {
return base64_encode(md5(uniqid() . mt_rand(1, 8192), true));
}
/**
* websocket握手
* @throws Exception
*/
protected function handshake() {
$this->secWebSocketKey = self::generateWsKey();
$headers = [
'GET ' . $this->path . ' HTTP/1.1',
'Host: ' . $this->host . ':' . $this->port,
'Upgrade: websocket',
'Connection: Upgrade',
'Sec-WebSocket-Key: ' . $this->secWebSocketKey,
'Sec-WebSocket-Version: 13',
];
$htmlHeader = implode(self::HTTP_HEADER_SEPARATION_MARK, $headers) . self::HTTP_HEADER_END_MARK;
$this->writeToSock($htmlHeader);
$response = '';
$end = false;
do {
$str = fread($this->sock, 8192);
if (strlen($str) == 0) {
break;
}
$response .= $str;
$end = strpos($response, self::HTTP_HEADER_END_MARK);
} while ($end === false);
if ($end === false) {
throw new HandshakeException('握手失?。何帐猪憫?yīng)不是標(biāo)準(zhǔn)的http響應(yīng)', __LINE__);
}
$resHeader = substr($response, 0, $end);
$headers = explode(self::HTTP_HEADER_SEPARATION_MARK, $resHeader);
if (strpos($headers[0], '101') === false) {
throw new HandshakeException('握手失?。悍?wù)器返回http狀態(tài)碼不是101', __LINE__);
}
for ($i = 1; $i < count($headers); $i++) {
list($key, $val) = explode(':', $headers[$i]);
if (strtolower(trim($key)) == 'sec-websocket-accept') {
$accept = base64_encode(sha1($this->secWebSocketKey . self::UUID, true));
if (trim($val) != $accept) {
throw new HandshakeException('握手失?。?sec-websocket-accept值校驗失敗', __LINE__);
}
$this->handshakePass = true;
break;
}
}
if (!$this->handshakePass) {
throw new HandshakeException('握手失敗:缺少sec-websocket-accept http頭', __LINE__);
}
}
/**
* @param int $opCode 幀類型
* @param string $playLoad 攜帶的數(shù)據(jù)
* @param bool $isMask 是否使用掩碼
* @param int $status 關(guān)閉幀狀態(tài)
* @return false|string
*/
protected function packFrame($opCode, $playLoad = '', $isMask = true, $status = 1000) {
$firstByte = 0x80 | $opCode;
if ($isMask) {
$secondByte = 0x80;
} else {
$secondByte = 0x00;
}
$playLoadLen = strlen($playLoad);
if ($opCode == self::OPCODE_CLOSE) {
// 協(xié)議規(guī)定關(guān)閉幀必須使用掩碼
$isMask = true;
$playLoad = pack('CC', (($status >> 8) & 0xff), $status & 0xff) . $playLoad;
$playLoadLen += 2;
}
if ($playLoadLen <= self::FRAME_LENGTH_LEVEL_1_MAX) {
$secondByte |= $playLoadLen;
$frame = pack('CC', $firstByte, $secondByte);
} elseif ($playLoadLen <= self::FRAME_LENGTH_LEVEL_2_MAX) {
$secondByte |= 126;
$frame = pack('CCn', $firstByte, $secondByte, $playLoadLen);
} else {
$secondByte |= 127;
$frame = pack('CCJ', $firstByte, $secondByte, $playLoadLen);
}
if ($isMask) {
$maskBytes = [mt_rand(1, 255), mt_rand(1, 255), mt_rand(1, 255), mt_rand(1, 255)];
$frame .= pack('CCCC', $maskBytes[0], $maskBytes[1], $maskBytes[2], $maskBytes[3]);
if ($playLoadLen > 0) {
for ($i = 0; $i < $playLoadLen; $i++) {
$playLoad[$i] = chr(ord($playLoad[$i]) ^ $maskBytes[$i % 4]);
}
}
}
$frame .= $playLoad;
return $frame;
}
/**
* ping服務(wù)器
* @throws Exception
*/
public function ping() {
try {
$frame = $this->packFrame(self::OPCODE_PING, '', true);
$this->writeToSock($frame);
do {
$pong = $this->recv();
if ($pong->opcode == self::OPCODE_PONG) {
return true;
}
} while ($pong->opcode != self::OPCODE_PONG);
return false;
} catch (Exception $e) {
return false;
}
}
/**
* 響應(yīng)服務(wù)器的ping
* @throws Exception
*/
public function pong() {
$frame = $this->packFrame(self::OPCODE_PONG, '', true);
$this->writeToSock($frame);
}
/**
* 主動關(guān)閉與服務(wù)器的連接
* @return bool
* @throws Exception
*/
public function close() {
$frame = $this->packFrame(self::OPCODE_CLOSE, '', true, 1000);
try {
$this->writeToSock($frame);
// 主動關(guān)閉需要再接收一次對端返回的確認(rèn)消息
$wsData = $this->recv();
if ($wsData->opcode == self::OPCODE_CLOSE) {
return true;
}
} catch (\Throwable $e) {
} finally {
$this->closed = true;
stream_socket_shutdown($this->sock, STREAM_SHUT_RDWR);
}
return false;
}
/**
* ping服務(wù)器失敗或服務(wù)器響應(yīng)異常時調(diào)用,用于關(guān)閉socket資源
*/
public function abnormalClose() {
if (!$this->closed && $this->sock) {
$this->closed = true;
try {
stream_socket_shutdown($this->sock, STREAM_SHUT_RDWR);
} catch (\Throwable $e) {
}
}
}
/**
* 響應(yīng)服務(wù)器的關(guān)閉消息
* @throws SocketRWException
*/
protected function replyClosure() {
$frame = $this->packFrame(self::OPCODE_CLOSE, '', true, 1000);
$this->writeToSock($frame);
$this->closed = true;
stream_socket_shutdown($this->sock, STREAM_SHUT_RDWR);
}
/**
* @param string $data 要發(fā)送的數(shù)據(jù)
* @param int $opCode 發(fā)送的數(shù)據(jù)類型 WebSocketClient::OPCODE_TEXT_FRAME 或 WebSocketClient::OPCODE_BINARY_FRAME
* @param bool $isMask 是否使用掩碼,默認(rèn)使用
* @throws Exception
*/
public function send($data, $opCode = self::OPCODE_TEXT_FRAME, $isMask = true) {
if ($opCode != self::OPCODE_TEXT_FRAME && $opCode != self::OPCODE_BINARY_FRAME) {
throw new \InvalidArgumentException('不支持的幀數(shù)據(jù)類型', __LINE__);
}
$frame = $this->packFrame($opCode, $data, $isMask);
$this->writeToSock($frame);
}
/**
* 初始化收取數(shù)據(jù)緩沖區(qū)
*/
private function initReadBuf() {
$this->readBuf = '';
$this->currentReadBufLen = 0;
$this->readBufPos = 0;
}
/**
* 從讀取緩沖區(qū)中當(dāng)前位置返回指定長度字符串
* @param int $len 返回長度
* @return bool|string
* @throws SocketRWException
*/
private function fetchStrFromReadBuf($len = 1) {
$target = $this->readBufPos + $len;
while ($target > $this->currentReadBufLen) {
if ($this->closed) {
throw new SocketRWException('連接已關(guān)閉, 不允許再收取消息', __LINE__);
}
$read = fread($this->sock, self::PACKET_SIZE);
if (!$read) {
$meta = stream_get_meta_data($this->sock);
if ($meta['timed_out']) {
throw new SocketRWException('讀取服務(wù)器數(shù)據(jù)超時', __LINE__);
}
throw new SocketRWException('無法讀取服務(wù)器數(shù)據(jù),錯誤未知', __LINE__);
}
$this->readBuf .= $read;
$this->currentReadBufLen += strlen($read);
}
$str = substr($this->readBuf, $this->readBufPos, $len);
$this->readBufPos += $len;
return $str;
}
/**
* 返回讀取緩沖區(qū)當(dāng)前位置字符的ascii碼
* @return int
* @throws SocketRWException
*/
private function fetchCharFromReadBuf() {
$str = $this->FetchStrFromReadBuf(1);
return ord($str[0]);
}
/**
* 丟棄讀取緩沖區(qū)已處理的指定長度數(shù)據(jù)
* @param $len
*/
private function discardReadBuf($len) {
// 未處理的數(shù)據(jù)不會被丟棄
if ($len > $this->readBufPos) {
$len = $this->readBufPos;
}
if ($len > 0) {
$this->readBuf = substr($this->readBuf, $len);
$this->readBufPos -= $len;
$this->currentReadBufLen -= $len;
}
}
/**
* @return WsDataFrame
* @throws Exception
*/
public function recv() {
$dataFrame = $this->readFrame();
switch ($dataFrame->opcode) {
case self::OPCODE_PING:
$this->pong();
break;
case self::OPCODE_PONG:
break;
case self::OPCODE_TEXT_FRAME:
case self::OPCODE_BINARY_FRAME:
case self::OPCODE_CLOSE:
if ($dataFrame->fin == 0) {
do {
$continueFrame = $this->readFrame();
$dataFrame->playload .= $continueFrame->playload;
} while ($continueFrame->fin == 0);
}
if ($dataFrame->opcode == self::OPCODE_CLOSE) {
$this->replyClosure();
}
break;
default:
throw new BadFrameException('無法識別的frame數(shù)據(jù)', __LINE__);
break;
}
return $dataFrame;
}
/**
* 讀取一個數(shù)據(jù)幀
* @return WsDataFrame
* @throws SocketRWException
*/
protected function readFrame() {
$firstByte = $this->fetchCharFromReadBuf();
$fin = ($firstByte >> 7);
$opcode = $firstByte & 0x0F;
$secondByte = $this->fetchCharFromReadBuf();
$isMasked = ($secondByte >> 7);
$dataLen = $secondByte & 0x7F;
if ($dataLen == 126) {
// 2字節(jié)無符號整形
$dataLen = ($this->fetchCharFromReadBuf() << 8) + $this->fetchCharFromReadBuf();
} elseif ($dataLen == 127) {
// 8字節(jié)無符號整形
$dataLen = $this->fetchStrFromReadBuf(8);
$res = unpack('Jlen', $dataLen);
if (isset($res['len'])) {
$dataLen = $res['len'];
} else {
$dataLen = (ord($dataLen[0]) << 56)
+ (ord($dataLen[1]) << 48)
+ (ord($dataLen[2]) << 40)
+ (ord($dataLen[3]) << 32)
+ (ord($dataLen[4]) << 24)
+ (ord($dataLen[5]) << 16)
+ (ord($dataLen[6]) << 8)
+ ord($dataLen[7]);
}
}
$data = '';
$status = 0;
if ($dataLen > 0) {
if ($isMasked) {
// 4字節(jié)掩碼
$maskChars = $this->fetchStrFromReadBuf(4);
$maskSet = [ord($maskChars[0]), ord($maskChars[1]), ord($maskChars[2]), ord($maskChars[3])];
$data = $this->fetchStrFromReadBuf($dataLen);
for ($i = 0; $i < $dataLen; $i++) {
$data[$i] = chr(ord($data[$i]) ^ $maskSet[$i % 4]);
}
} else {
$data = $this->fetchStrFromReadBuf($dataLen);
}
if ($opcode == self::OPCODE_CLOSE) {
$status = (ord($data[0]) << 8) + ord($data[1]);
$data = substr($data, 2);
}
}
$this->discardReadBuf($this->readBufPos);
$dataFrame = new WsDataFrame();
$dataFrame->opcode = $opcode;
$dataFrame->fin = $fin;
$dataFrame->status = $status;
$dataFrame->playload = $data;
return $dataFrame;
}
/**
* __destruct
*/
public function __destruct() {
$this->abnormalClose();
}
}
/**
* websocket數(shù)據(jù)幀
* Class wsDataFrame
* @package library\util
*/
class WsDataFrame {
/**
* @var int $opcode
*/
public $opcode;
/**
* @var int $fin 標(biāo)識數(shù)據(jù)包是否已結(jié)束
*/
public $fin;
/**
* @var int $status 關(guān)閉時的狀態(tài)碼,如果有的話
*/
public $status;
/**
* @var string 數(shù)據(jù)包攜帶的數(shù)據(jù)
*/
public $playload;
}
下面看看調(diào)用代碼:
try {
$ws = new WebSocketClient('ws://' . $GLOBALS['ws_server_config']['host'] . ':' . $GLOBALS['ws_server_config']['port']);
$ws->send($data);
$frame = $ws->recv();
//echo "收到服務(wù)器響應(yīng)數(shù)據(jù):" . $frame->playload . PHP_EOL;
$ws->close();
} catch (\Exception $e) {
//echo "錯誤: ";
//var_dump($e->__toString());
}
這個其實也是我在網(wǎng)上找的代碼,個人感覺比 workerman 官方的那個要簡潔一些。
官方的方案是:http://doc.workerman.net/faq/as-wss-client.html
感覺這個好復(fù)雜,我要發(fā)起一個ws協(xié)議鏈接,還得起一個worker?感覺動作太大了。
我還是個新手,歡迎大家噴一下,我這個有問題沒,謝謝。
@walkor 官方能出一個類似這樣的簡單一點的類嗎?直接
$client = new WebsocketClient($link);
$client->send('hello');
$client->close();
這樣的。