php socket服务的模型以及实现 多进程IO复用libevent

端口复用技术,这样就可以很好的解决惊群问题和stream_socket_server性能瓶颈的问题.

<?php
/**
 * 多进程IO复用libevent
 * 同时处理多个连接
 * 端口复用---建议php7
 */
class Xtgxiso_server
{
    public $socket = false;
    public $master = array();
    public $onConnect = null;
    public $onMessage = null;
    public $onClose = null;
    public $process_num = 2;
    private $pids = array();
    public $receive = array();
    private  $host='127.0.0.1';
    private $port = 1215;

    function __construct($host="0.0.0.0",$port=1215){
        //产生子进程分支
        $pid = pcntl_fork();
        if ($pid == -1) {
            die("could not fork"); //pcntl_fork返回-1标明创建子进程失败
        } else if ($pid) {
            exit(); //父进程中pcntl_fork返回创建的子进程进程号
        } else {
            // 子进程pcntl_fork返回的时0
        }
        // 从当前终端分离
        if (posix_setsid() == -1) {
            die("could not detach from terminal");
        }
        umask(0);
        $this->host = $host;
        $this->port = $port;
    }

    private function start_worker_process(){
        $pid = pcntl_fork();
        switch ($pid) {
            case -1:
                echo "fork error : {$i} \r\n";
                exit;
            case 0:
                $context_option['socket']['so_reuseport'] = 1;
                $context = stream_context_create($context_option);
                $this->socket = stream_socket_server("tcp://".$this->host.":".$this->port, $errno, $errstr,STREAM_SERVER_BIND | STREAM_SERVER_LISTEN,$context);
                if (!$this->socket) die($errstr."--".$errno);
                stream_set_blocking($this->socket,0);
                $id = (int)$this->socket;
                $this->master[$id] = $this->socket;
                $base = event_base_new();
                $event = event_new();
                event_set($event, $this->socket, EV_READ | EV_PERSIST, array(__CLASS__, 'ev_accept'), $base);
                event_base_set($event, $base);
                event_add($event);
                echo   posix_getpid()." start run...\n";
                event_base_loop($base);
            default:
                $this->pids[$pid] = $pid;
                break;
        }
    }

    public function run(){

        for($i = 1; $i <= $this->process_num; $i++){
            $this->start_worker_process();
        }

        while(1){
            foreach ($this->pids as $i => $pid) {
                if($pid) {
                    $res = pcntl_waitpid($pid, $status,WNOHANG);

                    if ( $res == -1 || $res > 0 ){
                        $this->start_worker_process();
                        unset($this->pids[$pid]);
                    }
                }
            }
            sleep(1);
        }
    }

    public function ev_accept($socket, $flag, $base){
        $connection = @stream_socket_accept($socket);
        echo posix_getpid()." -- accepted " . stream_socket_get_name($connection,true) . "\n";
        if ( !$connection ){
            return;
        }
        stream_set_blocking($connection, 0);
        $id = (int)$connection;
        if($this->onConnect) {
            call_user_func($this->onConnect, $connection);
        }
        $buffer = event_buffer_new($connection, array(__CLASS__, 'ev_read'), array(__CLASS__, 'ev_write'), array(__CLASS__, 'ev_error'), $id);
        event_buffer_base_set($buffer, $base);
        event_buffer_timeout_set($buffer, 30, 30);
        event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
        event_buffer_priority_set($buffer, 10);
        event_buffer_enable($buffer, EV_READ | EV_PERSIST);
        $this->master[$id] = $connection;
        $this->buffer[$id] = $buffer;
        $this->receive[$id] = '';
    }

    function ev_read($buffer, $id)
    {
        while( 1 ) {
            $read = event_buffer_read($buffer, 3);
            if($read === '' || $read === false)
            {
                break;
            }
            $pos = strpos($read, "\n");
            if($pos === false)
            {
                $this->receive[$id] .= $read;
                //echo "received:".$read.";not all package,continue recdiveing\n";
            }else{
                $this->receive[$id] .= trim(substr ($read,0,$pos+1));
                $read = substr($read,$pos+1);
                if($this->onMessage)
                {
                    call_user_func($this->onMessage,$this->master[$id],$this->receive[$id]);
                }
                switch ( $this->receive[$id] ){
                    case "quit":
                        echo "client close conn\n";
                        if($this->onClose) {
                            call_user_func($this->onClose, $this->master[$id]);
                        }
                        fclose($this->master[$id]);
                        break;
                    default:
                        //echo "all package:\n";
                        //echo $this->receive[$id]."\n";
                        break;
                }
                $this->receive[$id]='';
            }
        }
    }

    function ev_write($buffer, $id)
    {
        echo "$id -- " ."\n";
    }

    function ev_error($buffer, $error, $id)
    {
        echo "ev_error - ".$error."\n";
    }

}
$server =  new Xtgxiso_server();

$server->onConnect = function($conn){
    echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "\n";
    fwrite($conn,"conn success\n");
};

$server->onMessage = function($conn,$msg){
    echo "onMessage --" . $msg . "\n";
    fwrite($conn,"received ".$msg."\n");
};

$server->onClose = function($conn){
    echo "onClose --" . stream_socket_get_name($conn,true) . "\n";
};

$server->run();
   经过多次服务模型的演变,基本我们实现了一个高性能的服务模型!

时间: 2024-10-24 16:09:52

php socket服务的模型以及实现 多进程IO复用libevent的相关文章

php中socket服务的模型下的编程方式(同步和异步)

    前面我们花了一段时间来搭建高性能的socket服务,可以同时处理大量的连接,但这是在没有具体业务的情况下.     如果我们启用了一个单进程的server,但里面的一个业务耗时1秒,那么在这1秒内是阻塞的,后续的请求会等待,如果并发三个请求,那么三个请求的执行时间会分别昌1秒,2秒,3秒.提高并发的方法有以下几种:     1:多启动进程,提高并发数     2:优化业务,减少耗时间相当于减少阻塞时间,提高并发数     3:异步编程,避免阻塞,提高并发数     这里我们重点介绍第三种

Mpass – PHP做Socket服务的解决方案

一般很少有用PHP做服务的, 但是如果你的已有业务逻辑都是PHP实现的, 而现在却需要基于已有的业务逻辑提供一套Socket服务, 怎么办? 当然, 解决方法很多, 但最简单的办法, 还是直接使用PHP做Socket服务. 而这样做要解决的问题有很多, 主要要解决的问题有如下几个: 1. 作为后台服务,需要常驻后台运行, 那么丁点的内存泄露都是不能接受的. 2. 作为后台服务,畸形数据导致进程异常退出, 也是不可接受的. 3. 作为后台服务, 要能做到graceful restart. 4. 作

很幽默的讲解六种Socket I/O模型

很幽默的讲解六种Socket I/O模型   信息来源:幻影论坛     作 者: flyinwuhan (制怒·三思而后行) 本文简单介绍了当前Windows支持的各种Socket I/O模型,如果你发现其中存在什么错误请务必赐教. 一:select模型二:WSAAsyncSelect模型三:WSAEventSelect模型四:Overlapped I/O 事件通知模型五:Overlapped I/O 完成例程模型六:IOCP模型 老陈有一个在外地工作的女儿,不能经常回来,老陈和她通过信件联系

基于Delphi的Socket I/O模型全接触

老陈有一个在外地工作的女儿,不能经常回来,老陈和她通过信件联系.他们的信会被邮递员投递到他们的信箱里. 这和Socket模型非常类似.下面我就以老陈接收信件为例讲解Socket I/O模型. 一:select模型 老陈非常想看到女儿的信.以至于他每隔10分钟就下楼检查信箱,看是否有女儿的信,在这种情况下,"下楼检查信箱"然后回到楼上耽误了老陈太多的时间,以至于老陈无法做其他工作. select模型和老陈的这种情况非常相似:周而复始地去检查......如果有数据......接收/发送..

Java基于socket服务实现UDP协议的方法

  本文实例讲述了Java基于socket服务实现UDP协议的方法.分享给大家供大家参考.具体如下: 示例1: 接收类: ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package com.socket.demo; import java.io.IOException; import java.net.DatagramPacket; import java.net

六种Socket I/O模型幽默讲解

本文简单介绍了当前Windows支持的各种Socket I/O模型. 老陈有一个在外地工作的女儿,不能经常回来,老陈和她通过信件联系.他们的信会被邮递员投递到他们的信箱里.这和Socket模型非常类似.下面就以此为例讲解Socket I/O模型. 零:阻塞模型 老陈非常想看女儿的信,以至于他什么都不做,就站在门口等.直到接到邮递员给他的信件才开心的看信回信. 这就是阻塞模型,进程阻塞在socket的接收函数上.   一:select模型 但是不吃不喝一直站门口等着总不行吧.所以他每隔10分钟就下

socket服务端必须写在wcf或winform中吗

问题描述 没做过socket,现在要做socket的服务端,我现在将代码写在web项目的类库中,不清楚socket服务是否随着web项目启动而启动,还是需要重新起一个winform或wcf项目,请指点下呀 解决方案 解决方案二:和通常的socket一样写就可以了解决方案三:需要单独写一个socket程序,可以是控制台,winform,wpf,跟wcf不是一回事.解决方案四:写个单独服务,部署在服务器.解决方案五:也可以卸载web项目的gloax文件中解决方案六:也可以windowsservice

golang(4):编写socket服务,简单支持命令

本文的原文连接是: http://blog.csdn.net/freewebsys/article/details/46881213 转载请必须注明出处! 1,socket服务 使用golang开发socket服务还是非常简单的. socket的库都封装好了. 参考文档: https://github.com/astaxie/build-web-application-with-golang/blob/master/zh/08.1.md 2,简单例子 package main import (

java ocket 反向-java Socket服务端反向查数据

问题描述 java Socket服务端反向查数据 需求是 两个java web项目,用socket实现数据即时查询 client向server注册,并保持连接 server向client发送请求,client返回数据. 我现在不知道怎么用ServerSocket向Socket对象发送请求,并且能得到回复. 帮我考虑一下,多谢. 解决方案 http://bbs.51cto.com/thread-1084435-1.html 一个简单的Demo 解决方案二: 看一下例子就行了http://blog.