$len = @fwrite($this->_socket, $send_buffer);
// send successful.
if ($len === strlen($send_buffer)) {
return true;
}
// Send only part of the data.
if ($len > 0) {
$this->_sendBuffer = substr($send_buffer, $len);
} else {
// Connection closed?
if (!is_resource($this->_socket) || feof($this->_socket)) {
self::$statistics++;
if ($this->onError) {
try {
call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'client closed');
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
}
$this->destroy();
return false;
}
$this->_sendBuffer = $send_buffer;
}
Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
這段代碼中 當(dāng)$len != strlen($send_buffer)的時(shí)候,為什么不繼續(xù) fwrite將剩余的數(shù)據(jù)發(fā)送出去,而要去監(jiān)聽(tīng)
baseWrite函數(shù), 在baseWrite里將剩余數(shù)據(jù)發(fā)送出去
$len != strlen($send_buffer) 說(shuō)明緩沖區(qū)滿(mǎn)了,暫時(shí)無(wú)法再次發(fā)送數(shù)據(jù)了。
再次調(diào)用fwrite很大的幾率也是發(fā)送不了,所以要等待socket緩沖區(qū)的數(shù)據(jù)被對(duì)方接收socket緩沖區(qū)可寫(xiě)才能再次調(diào)用fwrite發(fā)送。
類(lèi)似一個(gè)物流公司,有一個(gè)人(進(jìn)程)負(fù)責(zé)發(fā)貨(發(fā)送數(shù)據(jù)),這個(gè)人(進(jìn)程)有很多倉(cāng)庫(kù)(鏈接)。一個(gè)倉(cāng)庫(kù)(鏈接)裝滿(mǎn)了(socket寫(xiě)緩沖區(qū)滿(mǎn)),再往里放貨物放不下了(fwrite返回0),所以要等待倉(cāng)庫(kù)的貨物全部或者部分發(fā)出去(操作系統(tǒng)把socket緩沖區(qū)的數(shù)據(jù)發(fā)給對(duì)方),倉(cāng)庫(kù)有空地兒才能再次向里面放貨物。但是這個(gè)人(進(jìn)程)又不能在那里傻等(阻塞等待),萬(wàn)一貨物永遠(yuǎn)都發(fā)不出去呢(進(jìn)程永久阻塞等待)?豈不是不是很虧,這個(gè)時(shí)間這個(gè)人(進(jìn)程)可以給其它倉(cāng)庫(kù)裝貨物呢(給其它鏈接發(fā)送數(shù)據(jù))。所以這個(gè)人設(shè)置了一個(gè)機(jī)制(io復(fù)用機(jī)制,類(lèi)似epoll),當(dāng)貨物發(fā)出倉(cāng)庫(kù)又有地方可以放新貨物的時(shí)候通知他(socket可寫(xiě)事件),這個(gè)人繼續(xù)向里面放貨物(調(diào)用baseWrite)。這樣這個(gè)人通過(guò)這種類(lèi)似通知機(jī)制(epoll)大大提高了整個(gè)物流系統(tǒng)的吞吐量。