workerman/mqtt
MQTT是一個(gè)客戶端服務(wù)端架構(gòu)的發(fā)布/訂閱模式的消息傳輸協(xié)議,已經(jīng)成為物聯(lián)網(wǎng)的重要組成部分。它的設(shè)計(jì)思想是輕巧、開放、簡(jiǎn)單、規(guī)范,易于實(shí)現(xiàn)。這些特點(diǎn)使得它對(duì)很多場(chǎng)景來(lái)說(shuō)都是很好的選擇,特別是對(duì)于受限的環(huán)境如機(jī)器與機(jī)器的通信(M2M)以及物聯(lián)網(wǎng)環(huán)境(IoT)。
workerman\mqtt 是一個(gè)基于workerman的異步mqtt 客戶端庫(kù),可用于接收或者發(fā)送mqtt協(xié)議的消息。支持QoS 0
、QoS 1
、QoS 2
。支持MQTT
3.1
3.1.1
5
版本。
項(xiàng)目地址
https://github.com/walkor/mqtt
安裝
composer require workerman/mqtt
支持
- MQTT
- MQTT5
- MQTT over websocket
示例
subscribe.php
<?php
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function(){
$mqtt = new Workerman\Mqtt\Client('mqtt://test.mosquitto.org:1883');
$mqtt->onConnect = function($mqtt) {
$mqtt->subscribe('test');
};
$mqtt->onMessage = function($topic, $content){
var_dump($topic, $content);
};
$mqtt->connect();
};
Worker::runAll();
命令行運(yùn)行 php subscribe.php start
啟動(dòng)
publish.php
<?php
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function(){
$mqtt = new Workerman\Mqtt\Client('mqtt://test.mosquitto.org:1883');
$mqtt->onConnect = function($mqtt) {
$mqtt->publish('test', 'hello workerman mqtt');
};
$mqtt->connect();
};
Worker::runAll();
命令行運(yùn)行 php publish.php start
啟動(dòng)。
workerman\mqtt\Client接口
Client::__construct()
Client::connect()
Client::publish()
Client::subscribe()
Client::unsubscribe()
Client::disconnect()
Client::close()
callback onConnect
callback onMessage
callback onError
callback onClose
__construct (string $address, [array $options])
創(chuàng)建一個(gè)mqtt客戶端實(shí)例。
-
$address
mqtt服務(wù)端地址,格式類似 'mqtt://test.mosquitto.org:1883'. -
$options
客戶端選項(xiàng)數(shù)組,可以設(shè)置以下選項(xiàng):keepalive
: 客戶端向服務(wù)端發(fā)送心跳的時(shí)間間隔,默認(rèn)50秒,設(shè)置成0代表不啟用心跳client_id
: 客戶端id,如果沒設(shè)置默認(rèn)是"workerman-mqtt-client-".mt_rand()
protocol_name
: 協(xié)議名,MQTT
(3.1.1版本)或者MQIsdp
(3.1版本),默認(rèn)是MQTT
protocol_level
:協(xié)議等級(jí),protocol_name
是MQTT
時(shí)值為4
,protocol_name
是MQIsdp
時(shí)值是3
clean_session
: 清理會(huì)話,默認(rèn)為true
。設(shè)置為false
可以接收到QoS 1
和QoS 2
級(jí)別的離線消息reconnect_period
: 重連時(shí)間間隔,默認(rèn)1
秒,0
代表不重連connect_timeout
: 連接mqtt超時(shí)時(shí)間,默認(rèn)30
秒username
: 用戶名,可選password
: 密碼,可選will
: 遺囑消息,當(dāng)客戶端斷線后Broker會(huì)自動(dòng)發(fā)送遺囑消息給其它客戶端. 格式為:topic
: 主題content
: 內(nèi)容qos
:QoS
等級(jí)retain
:retain
標(biāo)記
resubscribe
: 當(dāng)連接異常斷開并重連后,是否重新訂閱之前的主題,默認(rèn)為truebindto
用來(lái)指定本地以哪個(gè)ip和端口向Broker發(fā)起連接,默認(rèn)值為''ssl
ssl選項(xiàng),默認(rèn)是false
,如果設(shè)置為true
,則以ssl方式連接。同時(shí)支持傳入ssl上下文數(shù)組,用來(lái)配置本地證書等,ssl上下文參考https://php.net/manual/en/context.ssl.phpdebug
是否開啟debug模式,debug模式可以輸出與Broker通訊的詳細(xì)信息,默認(rèn)為false
uri
mqtt over websocket的uri地址,一般為/mqtt
connect()
連接Broker
publish(String $topic, String $content, [array $options], [callable $callback])
向某個(gè)主題發(fā)布一條消息
$topic
主題$content
消息$options
選項(xiàng)數(shù)組,包括qos
QoS
等級(jí),默認(rèn)0
retain
retain 標(biāo)記,默認(rèn)false
dup
重發(fā)標(biāo)志,默認(rèn)false
$callback
-function (\Exception $exception = null)
,(QoS為0時(shí)不支持此設(shè)置)當(dāng)發(fā)生錯(cuò)誤時(shí)或者發(fā)布成功時(shí)觸發(fā),$exception
是異常對(duì)象,當(dāng)沒有錯(cuò)誤發(fā)生時(shí)$exception
為null
,下同。
subscribe(mixed $topic, [array $options], [callable $callback])
訂閱一個(gè)主題或者多個(gè)主題
$topic
是一個(gè)字符串(訂閱一個(gè)主題)或者數(shù)組(訂閱多個(gè)主題),
當(dāng)訂閱多個(gè)主題時(shí),$topic
是主題是key,QoS
為值的數(shù)組,例如array('topic1'=> 0, 'topic2'=> 1)
$options
訂閱選項(xiàng)數(shù)組,包含以下設(shè)置:qos
QoS
等級(jí), 默認(rèn)0
$callback
-function (\Exception $exception = null, array $granted = [])
回調(diào)函數(shù),當(dāng)訂閱成功或者發(fā)生錯(cuò)誤時(shí)觸發(fā)exception
異常對(duì)象,無(wú)錯(cuò)誤發(fā)生時(shí)它是null
,下同granted
訂閱結(jié)果數(shù)組,類似array('topic' => 'qos', 'topic' => 'qos')
其中:topic
是訂閱的主題qos
Broker接受的QoS
等級(jí)
unsubscribe(mixed $topic, [callable $callback])
取消訂閱
$topic
是一個(gè)字符串或者字符串?dāng)?shù)組,類似array('topic1', 'topic2')
$callback
-function (\Exception $e = null)
, 成功或者失敗時(shí)觸發(fā)的回調(diào)
disconnect()
正常斷開與Broker的連接, DISCONNECT
報(bào)文會(huì)被發(fā)送到Broker.
close()
強(qiáng)制斷開與Broker的連接,不會(huì)發(fā)送DISCONNECT
報(bào)文給Broker.
callback onConnect(Client $mqtt)
當(dāng)與Broker連接建立完畢后觸發(fā)。這時(shí)候已經(jīng)收到了Broker的CONNACK
報(bào)文。
callback onMessage(String $topic, String $content, Client $mqtt)
function (topic, message, packet) {}
當(dāng)客戶端收到Publish報(bào)文時(shí)觸發(fā)
$topic
收到的主題$content
收到的消息內(nèi)容$mqtt
mqtt客戶端實(shí)例.
callback onError(\Exception $exception = null)
當(dāng)連接發(fā)生某種錯(cuò)誤時(shí)觸發(fā)
callback onClose()
當(dāng)連接關(guān)閉時(shí)觸發(fā),無(wú)論是客戶端主動(dòng)關(guān)閉還是服務(wù)端關(guān)閉都會(huì)觸發(fā)onClose