前面我们花了一段时间来搭建高性能的socket服务,可以同时处理大量的连接,但这是在没有具体业务的情况下。
如果我们启用了一个单进程的server,但里面的一个业务耗时1秒,那么在这1秒内是阻塞的,后续的请求会等待,如果并发三个请求,那么三个请求的执行时间会分别昌1秒,2秒,3秒.提高并发的方法有以下几种:
1:多启动进程,提高并发数
2:优化业务,减少耗时间相当于减少阻塞时间,提高并发数
3:异步编程,避免阻塞,提高并发数
这里我们重点介绍第三种方法,以访问第三方http为例。
代码如下:
<?php
//同步读取
function get_data_blocking(){
$socket = stream_socket_client("tcp://test.raventech.cn:80", $errno, $errstr, 6);
fwrite($socket, "GET /sleep1.php HTTP/1.0\r\nHost: test.raventech.cn\r\nAccept: */*\r\n\r\n");
$str = "";
while (!feof($socket)) {
$str .= fgets($socket, 1024);
}
fclose($socket);
return $str;
}
//异步读取
function get_data_unblocking(){
$socket = stream_socket_client("tcp://test.raventech.cn:80", $errno, $errstr, 6);
stream_set_blocking($socket, 0);
fwrite($socket, "GET /sleep1.php HTTP/1.0\r\nHost: test.raventech.cn\r\nAccept: */*\r\n\r\n");
$write = NULL;
$except = NULL;
while( $socket ){
$read = array($socket);
$num_changed_streams = stream_select($read, $write, $except, 0);
if ( $num_changed_streams > 0 ) {
foreach($read as $r){
$str = fread($r,2048);
fclose($socket);
$socket = false;
return $str;
}
}
usleep(100);
}
}
//真正的异步读取--利用server的IO复用事件来提高并发
class Get_data_event{
public $onMessage = null;
private $str='';
function __construct(&$server){
$socket = stream_socket_client("tcp://test.xtgxiso.cn:80", $errno, $errstr, 6);
stream_set_blocking($socket, 0);
fwrite($socket, "GET /sleep1.php HTTP/1.0\r\nHost: test.xtgxiso.cn\r\nAccept: */*\r\n\r\n");
$server->add_socket($socket, array($this, 'read'));
}
public function read($socket){
while (1) {
$buffer = fread($socket, 1024);
if ($buffer === '' || $buffer === false) {
break;
}
$this->str .= $buffer;
}
if( $this->onMessage && $this->str ) {
call_user_func($this->onMessage, $this->str);
}
$this->str = '';
return false;
}
}
/**
* 单进程IO复用select
*/
class Xtgxiso_server
{
public $socket = false;
public $master = array();
public $onConnect = null;
public $onMessage = null;
public $other_socket_callback = array();
function __construct($host="0.0.0.0",$port=1215)
{
$this->socket = stream_socket_server("tcp://".$host.":".$port,$errno, $errstr);
if (!$this->socket) die($errstr."--".$errno);
stream_set_blocking($this->socket,0);
$id = (int)$this->socket;
$this->master[$id] = $this->socket;
}
public function add_socket($socket,$callback){
$id = (int)$socket;
$this->master[$id] = $socket;
$this->other_socket_callback[$id] = $callback;
}
public function run(){
$read = $this->master;
$receive = array();
echo "start run...\n";
while ( 1 ) {
$read = $this->master;
//echo "waiting...\n";
$mod_fd = @stream_select($read, $_w = NULL, $_e = NULL, 60);
if ($mod_fd === FALSE) {
break;
}
foreach ( $read as $k => $v ) {
$id = (int)$v;
if ( $v === $this->socket ) {
//echo "new conn\n";
$conn = stream_socket_accept($this->socket);
if ($this->onConnect) {
call_user_func($this->onConnect, $conn);
}
$id = (int)$conn;
$this->master[$id] = $conn;
} else if ( @$this->other_socket_callback[$id] ){
call_user_func_array($this->other_socket_callback[$id], array($v));
} else {
//echo "read data\n";
if ( !isset($receive[$k]) ){
$receive[$k]="";
}
$buffer = fread($v, 1024);
//echo $buffer."\n";
if ( strlen($buffer) === 0 ) {
if ( $this->onClose ){
call_user_func($this->onClose,$v);
}
fclose($v);
$id = (int)$v;
unset($this->master[$id]);
} else if ( $buffer === FALSE ) {
if ( $this->onClose ){
call_user_func($this->onClose, $this->master[$key_to_del]);
}
fclose($v);
$id = (int)$v;
unset($this->master[$id]);
} else {
$pos = strpos($buffer, "\r\n\r\n");
if ( $pos === false) {
$receive[$k] .= $buffer;
//echo "received:".$buffer.";not all package,continue recdiveing\n";
}else{
$receive[$k] .= trim(substr ($buffer,0,$pos+4));
$buffer = substr($buffer,$pos+4);
if($this->onMessage) {
call_user_func($this->onMessage,$v,$receive[$k]);
}
$receive[$k]='';
}
}
}
}
usleep(10000);
}
}
}
$server = new Xtgxiso_server();
$server->onConnect = function($conn){
echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "\n";
};
$server->onMessage = function($conn,$msg) use ( $server ) {
/*
$respone ="";//响应内容
$respone = "HTTP/1.1 200 OK\r\n";
$respone .= "Server: openresty\r\n";
$respone .= "Content-Type: text/html; charset=utf-8\r\n";
$body = time().rand(111111,999999);
$len = strlen($body);
$respone .= "Content-Length:$len\r\n";
$respone .= "Connection: close\r\n";
$respone .= "\r\n$body\r\n\r\n";
echo "onMessage --" . $msg . "\n";
*/
//同步读取
//$respone = get_data_blocking();
//fwrite($conn,$respone);
//异步读取
//$respone = get_data_unblocking();
//fwrite($conn,$respone);
//真正异步
$data = new Get_data_event($server);
$data->onMessage = function($str) use($conn){
fwrite($conn,$str);
};
};
$server->onClose = function($conn){
echo "onClose --" . "\n";
};
$server->run();
第三方服务sleep1.php的代码比较简单
<?php
sleep(1);//模拟耗时
echo "OK";
通过以上代码示例,我们分别注释运行 同步读取,异步读取,真正异步,来观察server的并发.测试方法可以写个test.html来模拟三个并发.
<script src="http://127.0.0.1:1215/?id=1"></script>
<script src="http://127.0.0.1:1215/?id=2"></script>
<script src="http://127.0.0.1:1215/?id=3"></script>
通过测试发现,真正异步的是并发的,每个请求耗时1秒,这样我们总算明白什么是真正的非阻塞异步编程了,关键就在共用IO复用.