首先感謝walkor大大創(chuàng)造了一個如此優(yōu)秀的php框架. 其中的多進程優(yōu)勢、異步IO、定時器和libevent事件輪詢庫、支持高并發(fā)等特性,讓我眼前為之一亮.
我看了手冊和demo,在做服務器方面已經(jīng)提供了很好地實例和說明,可我目前遇到一個需求:將php服務器模擬客戶端對外部服務器進行主動輪詢.
如示意圖:
[attach]67[/attach]
1.創(chuàng)建一個主進程(守護進程),一旦啟動長時間運行在后臺,即使關掉瀏覽器頁面.
主線程定時查詢數(shù)據(jù)庫(MySQL),一旦發(fā)現(xiàn)有符合條件的URL(可能多條),即創(chuàng)建對應的數(shù)量的子進程.
子進程也需要長時間存在,定時輪詢URL對應的服務器取回數(shù)據(jù).
子進程一旦取回所需要的數(shù)據(jù),將結(jié)果保存到數(shù)據(jù)庫,自我結(jié)束(或被主進程關閉).
目前的想法是在worker類里增加一個輪詢方法,但是感覺這樣破壞了框架結(jié)構(gòu).
難點:
1.如何創(chuàng)建子線程?
如何實現(xiàn)定時輪詢?
3.子線程如何自我關閉?
ps:
樓主從事iOS客戶端開發(fā),剛接觸PHP幾天,正在努力學習中,無奈項目期限太緊,苦思無果,前來寶地求助,希望能幫忙提供思路或給出簡單demo.
再次感謝walkor大大和熱心的朋友們.
首先贊一個,提問的非常有條理。
說說我的看法,
1、不能每個url一個進程,如果url數(shù)量控制不好,會造成創(chuàng)建太多進程導致服務器內(nèi)存資源耗盡
2、子進程不必自我結(jié)束,進程能復用就盡量復用
3、業(yè)務比較簡單,可以只開一個進程,并使用IO復用(workerman的異步IO或者curl_multi等),性能比多進程多線程更高
下面是一個定時器例子,只使用一個worker進程,定時查詢數(shù)據(jù)庫獲得url,并異步批量請求這些url。
文件名 :Applications/HttpPoll/start.php
<?php
use Workerman\Worker;
use Workerman\Lib\Timer;
use Workerman\Connection\AsyncTcpConnection;
$worker = new Worker();
$worker->onWorkerStart = function(){
Timer::add(5, 'http_poll');
};
function http_poll()
{
$url_array = get_url_array_from_db();
// 這里使用的是workerman的異步IO AsyncTcpConnection。也可以使用curl,那樣對http支持更好一些
foreach($url_array as $url)
{
// 建立異步鏈接
$connection = new AsyncTcpConnection('tcp://'.$url.':80');
// $connection 是個對象,可以把一些數(shù)據(jù)以屬性的形式存儲進去,使用的時候再讀取。這里把url存起來
$connection->url = $url;
// 鏈接失敗時的處理
$connection->onError = function($connection, $err_no, $err_msg)
{
echo "\n!!!!!!!!!!!!{$connection->url}!!!!!!!!!!!!!!!\n","fail $err_no $err_msg";
};
// 一旦鏈接上服務端,則發(fā)起http請求
$connection->onConnect = function($connection)
{
$connection->send("GET / HTTP/1.1\r\nConnection: close\r\nHost: {$connection->url}\r\nAccept: text/html\r\nUser-Agent: Mozilla/5.0\r\n\r\n");
};
// 一旦收到數(shù)據(jù)打印。注意這里AsyncTcpConnection指定的是tcp,沒有處理協(xié)議,這里接收的數(shù)據(jù)是分段的http協(xié)議數(shù)據(jù)
$connection->onMessage = function($connection, $http_buffer)
{
echo "\n----------$connection->url---------\n{$http_buffer}\n";
$connection->close();
};
}
}
// 從數(shù)據(jù)庫中讀取url
function get_url_array_from_db()
{
return array('www.baidu.com', 'www.163.com', 'www.sina.com');
}
注意上面 http_poll 函數(shù)中使用的是workerman自帶的異步IO,AsycTcpConnection,由于我沒有實現(xiàn)客戶端的http協(xié)議(實現(xiàn)方法參見手冊協(xié)議訂制部分),這里僅使用了tcp,沒有分包,可能會導致onMessage中的$http_buffer是分段發(fā)來的。
http_poll中可用curl_multi_*
函數(shù)替換workerman的異步IO,也可以批量獲取url,并且對http協(xié)議支持的更好。使用方法及例子見: http://php.net/manual/en/function.curl-multi-exec.php
感謝walkor大大的耐心回復.
可是實際的需求比示意圖要復雜:
1.從數(shù)據(jù)庫中取出的不是一個簡單地URL,而是一個taskId,要根據(jù)taskId去讀取另一張表,要依次定時輪詢表中的URL.
Task A 中的URL_A_1 與 Task B中的URL_B_1 可能需要同時發(fā)起(比如在晚上9點整同時請求n臺服務器的數(shù)據(jù)).
該工程會在多臺服務器上分布式部署,實現(xiàn)集群效果.
所以如果"可以只開一個進程,并使用IO復用"的話,雖然也可以實現(xiàn),但是我擔心在讀寫數(shù)據(jù)庫時候會有阻塞,導致任務是依次執(zhí)行的,而達不到并發(fā)的要求.另外復用一個進程,也可能會增加任務調(diào)度的邏輯復雜度.
如果我控制好worker進程的數(shù)量,解決服務器內(nèi)存資源耗盡的問題后(如果2G內(nèi)存,每個worker進程占用5m,那么我控制最多創(chuàng)建 400個進程),是否還有其它的隱患?
我在您的demo基礎上改了一下,您看寫得對不對?
<?php
use Workerman\Worker;
use Workerman\Lib\Timer;
use Workerman\Connection\AsyncTcpConnection;
$main_worker = new Worker();
$main_worker->onWorkerStart = function(){
Timer::add(5, 'sub_worker_create_poll');
};
$sub_worker_array = array();
function sub_worker_create_poll(){
$task_array = get_taskId_array_from_db();
foreach($task_array as $task_id)
{
if(empty($sub_worker_array)){
$sub_worker = new Worker();
$sub_worker->onWorkerStart = function(){
Timer::add(1, 'http_poll');
};
$sub_worker_array = $sub_worker;
}else{
echo "任務".$task_id."已創(chuàng)建進程";
}
}
}
function http_poll()
{
$task_id = ? //如何知道當前的函數(shù)是被哪個worker調(diào)用的? 然后取到$sub_worker_array對應的task_id?
$url_array = get_url_array_from_db_by_taskId($task_id);
// 這里使用的是workerman的異步IO AsyncTcpConnection。也可以使用curl,那樣對http支持更好一些
foreach($url_array as $url)
{
// 建立異步鏈接
$connection = new AsyncTcpConnection('tcp://'.$url.':80');
// $connection 是個對象,可以把一些數(shù)據(jù)以屬性的形式存儲進去,使用的時候再讀取。這里把url存起來
$connection->url = $url;
// 鏈接失敗時的處理
$connection->onError = function($connection, $err_no, $err_msg)
{
echo "\n!!!!!!!!!!!!{$connection->url}!!!!!!!!!!!!!!!\n","fail $err_no $err_msg";
};
// 一旦鏈接上服務端,則發(fā)起http請求
$connection->onConnect = function($connection)
{
$connection->send("GET / HTTP/1.1\r\nConnection: close\r\nHost: {$connection->url}\r\nAccept: text/html\r\nUser-Agent: Mozilla/5.0\r\n\r\n");
};
// 一旦收到數(shù)據(jù)打印。注意這里AsyncTcpConnection指定的是tcp,沒有處理協(xié)議,這里接收的數(shù)據(jù)是分段的http協(xié)議數(shù)據(jù)
$connection->onMessage = function($connection, $http_buffer)
{
echo "\n----------$connection->url---------\n{$http_buffer}\n";
$connection->close();
};
}
}
// 從數(shù)據(jù)庫中讀取task_id
function get_taskId_array_from_db()
{
return array('task_id_1', 'task_id_2', 'task_id_3');
}
// 根據(jù)task_id從數(shù)據(jù)庫中讀取對應的URLs
function get_url_array_from_db_by_taskId($task_id)
{
return array('www.baidu.com?param='.$task_id, 'www.163.com?param='.$task_id, 'www.sina.com?param='.$task_id);
}
@walkor
但是數(shù)據(jù)庫的taskid數(shù)量是不確定的,會動態(tài)變化,不知道該創(chuàng)建多少個?
是不是還得有個進程來管理這些子進程?