自從使用workerman實(shí)現(xiàn)物聯(lián)網(wǎng)終端接入以來(lái),我工作中的所有網(wǎng)絡(luò)場(chǎng)景(TCP\UDP\HTTP)等均使用workerman+channel以微服務(wù)方式實(shí)現(xiàn),開(kāi)發(fā)速度快,性能超級(jí)高。(幾十萬(wàn)臺(tái)設(shè)備同時(shí)接入都輕輕松松承受住)
之前多次關(guān)注過(guò)workerman的UDP服務(wù)器,但一沒(méi)有實(shí)現(xiàn)我想要的結(jié)果
由于近期的業(yè)務(wù)需求,外加HTTP3 QUIC協(xié)議的廣泛使用,workerman作為一個(gè)廣泛使用的高性能PHP網(wǎng)絡(luò)開(kāi)發(fā)框架,支持持久化的UDP通信是很有必要的。
一直以來(lái)想通過(guò)workerman編寫(xiě)個(gè)基于UDP的SIP服務(wù)器和實(shí)現(xiàn)GB28181的國(guó)標(biāo)協(xié)議,搭配SRS、ZLMediaKit或者monibuca,滿(mǎn)足攝像頭、硬盤(pán)錄像機(jī)設(shè)備的接入,也可配合FreeSwitch實(shí)現(xiàn)基于SIP的語(yǔ)音通話(huà)或視頻會(huì)議系統(tǒng)
workerman 主動(dòng)發(fā)送udp數(shù)據(jù)
http://www.wtbis.cn/q/2688
UDP服務(wù)器主動(dòng)向客戶(hù)端發(fā)送消息
http://www.wtbis.cn/q/4284
直到今天終于使用workerman 實(shí)現(xiàn)單進(jìn)程或多進(jìn)程方式監(jiān)聽(tīng)某個(gè)UDP端口,主動(dòng)從平臺(tái)向客戶(hù)端發(fā)送數(shù)據(jù)
并且所有功能均使用workerman的loop功能,能夠發(fā)揮平臺(tái)最大化性能
當(dāng)進(jìn)程只有一個(gè)時(shí)使用 socket 函數(shù)實(shí)現(xiàn)端口監(jiān)聽(tīng),當(dāng)進(jìn)程大于1個(gè)時(shí)使用stream_socket實(shí)現(xiàn)端口監(jiān)聽(tīng)(各有利弊,請(qǐng)酌情使用,大部分場(chǎng)景,推薦將進(jìn)程數(shù)保持與CPU數(shù)量一致,自動(dòng)使用 stream_socket )
經(jīng)過(guò)我初步測(cè)試:
當(dāng)使用stream_socket時(shí),服務(wù)器首次收到客戶(hù)端發(fā)送的數(shù)據(jù)后,能夠穩(wěn)定的向客戶(hù)端發(fā)送約5分鐘的數(shù)據(jù)報(bào)文,直到該通信會(huì)話(huà)被Linux內(nèi)核丟棄,因此使用UDP進(jìn)行通信,建議至少60秒進(jìn)行一次雙向心跳交互?;?/strong>。
當(dāng)使用socket時(shí),服務(wù)器首次收到客戶(hù)端發(fā)送的數(shù)據(jù)后,能夠穩(wěn)定的向客戶(hù)端長(zhǎng)期發(fā)送數(shù)據(jù)報(bào)文(如果網(wǎng)絡(luò)中的防火墻或NAT路由器沒(méi)有將會(huì)話(huà)過(guò)期,應(yīng)該可以一直使用)
下面直接發(fā)布代碼
<?php
chdir(dirname($_SERVER['SCRIPT_FILENAME']));
include_once __DIR__ . '/vendor/autoload.php';
use Workerman\Worker;
use Workerman\Lib\Timer;
$worker = new Worker();
$worker->count = 1; //開(kāi)啟進(jìn)程數(shù)量
$processName = "sip_server_udp5060";
$worker->transport = "udp";
$worker->name = $processName;
$worker->reusePort = true; //開(kāi)啟均衡負(fù)載模式
$date = date("Y-m-d");
Worker::$pidFile = "var/{$processName}.pid";
Worker::$logFile = "var/{$processName}_logFile.log";
Worker::$stdoutFile = "var/{$processName}_stdout.log";
$socket = null;
$workerId = null;
define("UNAUTHORIZED_KICKOFF_SECOND" , 1);
define("UNAUTHORIZED_ALLOW_TRYTIME" , 10);
define("UDP_SOCKET_TYPE_IS_STREAM" , $worker->count > 1 ? 1 : 0 );
$worker->onWorkerStart = function() {
global $worker , $workerId , $socket , $processName;
//定期與數(shù)據(jù)庫(kù)握手,避免被斷掉,該動(dòng)作每個(gè)進(jìn)程都得執(zhí)行
$workerId = $worker->id ;
$worker->connections = [];
$worker->connections_ur = [];
//根據(jù)daemon順序延時(shí),這是確保系統(tǒng)正常運(yùn)行的關(guān)鍵
usleep(1000 * 10 * ($worker->id+1) );
echo date("Y-m-d H:i:s")." 服務(wù)進(jìn)程{$worker->id}已經(jīng)啟動(dòng)!\n";
if( $workerId >= 0 ){
//計(jì)劃監(jiān)聽(tīng)的UDP端口
if(UDP_SOCKET_TYPE_IS_STREAM == 0){
//這種模式只能運(yùn)行一個(gè)進(jìn)程
$socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);
socket_bind($socket , "0.0.0.0" , 5060);
}else{
//這種模式可以多個(gè)進(jìn)程共同監(jiān)聽(tīng)同一端口
$context = stream_context_create();
$socket = stream_socket_server("udp://0.0.0.0:5060" , $error_code , $error_message , STREAM_SERVER_BIND , $context);
}
Worker::$globalEvent->add($socket, 1 , "acceptUdpConnection"); //加入全局事件loop
/*
//如果需要從服務(wù)器主動(dòng)發(fā)數(shù)據(jù)給客戶(hù)端,可以通過(guò)channel方式調(diào)用,具體的業(yè)務(wù)實(shí)現(xiàn)可以參照官方channel使用介紹
Channel\Client::connect(CHANNEL_SIP_IP , CHANNEL_SIP_PORT);
$event_name = $processName; //UDP的數(shù)據(jù)回復(fù)跟TCP不一樣,只要知道對(duì)方端口即可,理論上任意一個(gè)進(jìn)程均可從服務(wù)器端發(fā)送數(shù)據(jù)給客戶(hù)端
//到公網(wǎng)環(huán)境查詢(xún)本機(jī)公網(wǎng)IP,便于生成日志
Channel\Client::on($event_name, function($event_data)use($worker , $event_name) {
});
*/
//每隔一段時(shí)間對(duì)長(zhǎng)時(shí)間未進(jìn)行通信的臨時(shí)會(huì)話(huà)進(jìn)行清理
//不宜太大,太大容易遭受DDOS攻擊,也不宜太小,太小的話(huà)有可能還沒(méi)完成業(yè)務(wù)邏輯就被踢掉
Timer::add( UNAUTHORIZED_KICKOFF_SECOND , function (){
global $worker , $socket;
$dateTime = date("Y-m-d H:i:s");
$timeNow = time();
foreach ($worker->connections_ur as $remote_address => $remote_arr){
if( $remote_arr['lastMsgTime'] < $timeNow - UNAUTHORIZED_KICKOFF_SECOND ){
//非法接入,直接踢掉
unset($worker->connections_ur[$remote_address]);
//print("{$dateTime} {$remote_address} 踢掉非法接入\n");
//sendto($remote_address ,"illegal connection!\n");
}
//平臺(tái)主動(dòng)發(fā)送數(shù)據(jù)給終端,
sendto($remote_address ,"{$dateTime} test send data!\n");
}
});
//每隔一段時(shí)間對(duì)已經(jīng)認(rèn)證過(guò)的會(huì)話(huà)進(jìn)行檢查,對(duì)于長(zhǎng)時(shí)間未通信的需要進(jìn)行清理
Timer::add( 60 , function (){
global $worker , $socket;
$dateTime = date("Y-m-d H:i:s");
$timeNow = time();
foreach ($worker->connections as $remote_address => $remote_arr){
if( $remote_arr['lastMsgTime'] < $timeNow - 300 ){
//超時(shí)未通信,直接踢掉
unset($worker->connections[$remote_address]);
//這里編寫(xiě)遠(yuǎn)端會(huì)話(huà)離線(xiàn)的內(nèi)容
//print("{$dateTime} {$remote_address} 會(huì)話(huà)超時(shí)掉線(xiàn)\n");
//sendto($remote_address ,"connection timeout!\n");
}
}
});
}
};
function sendto($remote_address , $send_buffer ){
global $socket;
if(UDP_SOCKET_TYPE_IS_STREAM == 0){
list($host, $port) = explode(":" , $remote_address );
socket_sendto($socket , $send_buffer , strlen($send_buffer), 0, $host, $port);
}else{
stream_socket_sendto( $socket, $send_buffer, 0, $remote_address);
}
}
function acceptUdpConnection($socket){
global $worker;
$dateTime = date("Y-m-d H:i:s");
$timeNow = time();
set_error_handler(function(){});
if(UDP_SOCKET_TYPE_IS_STREAM == 0){
socket_recvfrom($socket, $recv_buffer, 65535, 0, $from, $port);
$remote_address = "{$from}:{$port}";
}else{
$recv_buffer = stream_socket_recvfrom($socket, 65535 , 0, $remote_address);
}
restore_error_handler();
if (false === $recv_buffer || empty($remote_address)) {
return false;
}
if(
isset($worker->connections[$remote_address]['lastMsgTime'])
&& $worker->connections[$remote_address]['lastMsgTime'] >= $timeNow - 300
){
//已注冊(cè)成功
$worker->connections[$remote_address]['lastMsgTime'] = $timeNow;
//處理業(yè)務(wù)邏輯 針對(duì)已經(jīng)認(rèn)證的合法連接
//針對(duì)已經(jīng)認(rèn)證過(guò)的連接,建議將收到的數(shù)據(jù)通過(guò)channel發(fā)布到其他服務(wù)端進(jìn)行輪詢(xún)處理,以最大化提升系統(tǒng)處理性能,此時(shí),本程序僅僅充當(dāng)gateway功能
}else{
//未注冊(cè)成功 未認(rèn)證的可以進(jìn)行幾次通信
$tryCount = $worker->connections_ur[$remote_address]['tryCount']??1;
if( $tryCount < UNAUTHORIZED_ALLOW_TRYTIME ){
$worker->connections_ur[$remote_address]['lastMsgTime'] = $timeNow;
$worker->connections_ur[$remote_address]['tryCount'] = $tryCount + 1;
//sendTo( $remote_address ,"{$dateTime} {$remote_address}:\nunauthorized,try time {$tryCount} \n");
//處理業(yè)務(wù)邏輯 對(duì)于未認(rèn)證的連接,必須在超時(shí)前完成認(rèn)證 完成認(rèn)證后需要將連接從 connections_ur 里面移出,并且加入到 connections 里面去
}else{
//超過(guò)一定交互次數(shù)但仍未完成認(rèn)證的會(huì)話(huà)將被忽視,直到被踢下線(xiàn)
//sendTo( $remote_address ,"{$dateTime} {$remote_address}:\nunauthorized,max try \n");
}
}
//收到數(shù)據(jù),打印日志
print( "{$dateTime} {$remote_address}:\n{$recv_buffer}\n");
};
Worker::runAll();
/*
以下是程序輸出樣例數(shù)據(jù)
2024-03-27 01:15:52 39.129.72.95:50574:
REGISTER sip:34020000002000000001@3402000000 SIP/2.0
Via: SIP/2.0/UDP 192.168.1.4:5060;rport;branch=z9hG4bK650883237
From: <sip:34020000001180870005@3402000000>;tag=698431213
To: <sip:34020000001180870005@3402000000>
Call-ID: 1146222786
CSeq: 1 REGISTER
Contact: <sip:34020000001180870005@192.168.1.4:5060>
Max-Forwards: 70
User-Agent: Embedded Net DVR/NVR/DVS
Expires: 86400
Content-Length: 0
2024-03-27 01:15:53 39.129.72.95:15363:
REGISTER sip:34020000002000000001@3402000000 SIP/2.0
Via: SIP/2.0/UDP 192.168.1.2:5060;rport;branch=z9hG4bK1890963109
From: <sip:34020000001180870007@3402000000>;tag=1955740825
To: <sip:34020000001180870007@3402000000>
Call-ID: 254089720
CSeq: 1 REGISTER
Contact: <sip:34020000001180870007@192.168.1.2:5060>
Max-Forwards: 70
User-Agent: Embedded Net DVR/NVR/DVS
Expires: 86400
Content-Length: 0
2024-03-27 01:15:54 39.129.72.95:31137:
REGISTER sip:34020000002000000001@3402000000 SIP/2.0
Via: SIP/2.0/UDP 192.168.1.8:5060;rport;branch=z9hG4bK1552460892
From: <sip:34020000001180870002@3402000000>;tag=1271059450
To: <sip:34020000001180870002@3402000000>
Call-ID: 108704431
CSeq: 1 REGISTER
Contact: <sip:34020000001180870002@192.168.1.8:5060>
Max-Forwards: 70
User-Agent: Embedded Net DVR/NVR/DVS
Expires: 86400
Content-Length: 0
01:38:09.293144 IP (tos 0x68, ttl 48, id 44466, offset 0, flags [DF], proto UDP (17), length 436)
39.129.72.95.64959 > 192.168.27.21.5060: [udp sum ok] SIP, length: 408
REGISTER sip:34020000002000000001@3402000000 SIP/2.0
Via: SIP/2.0/UDP 192.168.1.7:5060;rport;branch=z9hG4bK1838031091
From: <sip:34020000001180870003@3402000000>;tag=793428652
To: <sip:34020000001180870003@3402000000>
Call-ID: 1815643450
CSeq: 1 REGISTER
Contact: <sip:34020000001180870003@192.168.1.7:5060>
Max-Forwards: 70
User-Agent: Embedded Net DVR/NVR/DVS
Expires: 86400
Content-Length: 0
01:38:09.540771 IP (tos 0x0, ttl 64, id 30488, offset 0, flags [DF], proto UDP (17), length 64)
192.168.27.21.5060 > 39.129.72.95.50574: [udp sum ok] SIP
01:38:09.540806 IP (tos 0x0, ttl 64, id 30489, offset 0, flags [DF], proto UDP (17), length 64)
192.168.27.21.5060 > 39.129.72.95.31137: [udp sum ok] SIP
01:38:09.540815 IP (tos 0x0, ttl 64, id 30490, offset 0, flags [DF], proto UDP (17), length 64)
192.168.27.21.5060 > 39.129.72.95.15363: [udp sum ok] SIP
01:38:09.540822 IP (tos 0x0, ttl 64, id 45413, offset 0, flags [DF], proto UDP (17), length 64)
192.168.27.21.5060 > 39.130.87.71.34950: [udp sum ok] SIP
01:38:09.540830 IP (tos 0x0, ttl 64, id 27774, offset 0, flags [DF], proto UDP (17), length 64)
192.168.27.21.5060 > 39.128.225.81.35329: [udp sum ok] SIP
01:38:09.540838 IP (tos 0x0, ttl 64, id 30491, offset 0, flags [DF], proto UDP (17), length 64)
192.168.27.21.5060 > 39.129.72.95.26505: [udp sum ok] SIP
01:38:09.540845 IP (tos 0x0, ttl 64, id 2845, offset 0, flags [DF], proto UDP (17), length 64)
192.168.27.21.5060 > 39.129.72.95.64959: [udp sum ok] SIP
01:38:09.540860 IP (tos 0x0, ttl 64, id 30493, offset 0, flags [DF], proto UDP (17), length 64)
192.168.27.21.5060 > 39.129.72.95.36321: [udp sum ok] SIP
01:38:09.540867 IP (tos 0x0, ttl 64, id 27775, offset 0, flags [DF], proto UDP (17), length 64)
192.168.27.21.5060 > 39.128.225.81.35342: [udp sum ok] SIP
01:38:09.540873 IP (tos 0x0, ttl 64, id 30494, offset 0, flags [DF], proto UDP (17), length 64)
192.168.27.21.5060 > 39.129.72.95.23047: [udp sum ok] SIP
* */
https://github.com/jiechengyang/gb28181 這里有一個(gè)大佬寫(xiě)的 就是看不懂
特別說(shuō)明一下,本人對(duì)網(wǎng)絡(luò)有一定的了解,據(jù)我所知大部分網(wǎng)絡(luò)環(huán)境中的NAT防火墻UDP會(huì)話(huà)默認(rèn)超時(shí)時(shí)間大概為5分鐘-10分鐘左右。
因此只要UDP客戶(hù)端與服務(wù)器端每隔60秒進(jìn)行一次雙向的心跳交互,可以確保通信的持續(xù)進(jìn)行(當(dāng)有業(yè)務(wù)需求時(shí),任意一端可向另外一端發(fā)送所需要的數(shù)據(jù)報(bào)文),就像SIP一樣,哪怕你只發(fā)送個(gè)“hello”也是可以的。