java nio 网络框架实现(转)

 

maven项目
https://github.com/solq360/common

  • 链式编/解码
  • 链路层链式处理
  • 管道管理socket
  • 多协议处理非常方便
  • 仿netty NioEventLoop 单线程串行处理

========
侍加功能 :

  • 自动化编/解码
  • rpc 接口增强使用

简单聊天例子

server

TestNioServer

//创建session管理工厂
ISessionFactory sessionFactory = new SessionFactory();
//创建编/解码管理
ICoderParserManager coderParserManager = new CoderParserManager();
//注册包编/解码,处理业务
coderParserManager.register(CoderParser.valueOf("server chat", PackageDefaultCoder.valueOf(), new ChatTestServerHandle()));
//创建ServerSocket 实例
ServerSocket serverSocket=ServerSocket.valueOf(SocketChannelConfig.valueOf(6969), 10,20,coderParserManager, sessionFactory);

//启动服务
serverSocket.start();
//阻塞当前线程
serverSocket.sync();
//关闭处理
serverSocket.stop();
	

client

TestNioClient
传统方式连接

	//创建编/解码管理
 	ICoderParserManager coderParserManager = new CoderParserManager();
 	//注册包编/解码,处理业务
	coderParserManager.register(CoderParser.valueOf("chat", PackageDefaultCoder.valueOf(), new ChatHandle()));
	//创建ClientSocket 实例
	final ClientSocket clientSocket = ClientSocket.valueOf(SocketChannelConfig.valueOf(6969), new SocketPool("client", null), coderParserManager, new EmptyHandle());

	//模拟连接之后发送消息
	Timer timer = new Timer();
	timer.schedule(new TimerTask() {

	    @Override
	    public void run() {
		clientSocket.send("连接服务器成功");
		System.out.println("send ");
		this.cancel();
	    }
	}, 1000);

	//启动服务
	clientSocket.start();
	//阻塞当前线程
	clientSocket.sync();
	//关闭处理
	clientSocket.stop();

服务器方式连接

	//创建session管理工厂
	ISessionFactory sessionFactory = new SessionFactory();
	//创建编/解码管理
 	ICoderParserManager coderParserManager = new CoderParserManager();
 	//注册包编/解码,处理业务
	coderParserManager.register(CoderParser.valueOf("chat", PackageDefaultCoder.valueOf(), new ChatHandle()));
	//创建ClientSocket 实例
	final ServerSocket serverSocket = ServerSocket.valueOf(SocketChannelConfig.valueOf(8888), 10, 20, coderParserManager, sessionFactory);

	//模拟连接之后发送消息
	Timer timer = new Timer();
	timer.schedule(new TimerTask() {

	    @Override
	    public void run() {
		System.out.println("registerClientSocket");
		//主动连接服务器
		ClientSocket clientSocket = serverSocket.registerClient(SocketChannelConfig.valueOf(6969));
		clientSocket.send("连接服务器成功");
		this.cancel();
	    }
	}, 1000);

	//启动服务
	serverSocket.start();
	//阻塞当前线程
	serverSocket.sync();
	//关闭处理
	serverSocket.stop();

源码实现过程

链式编/解码

  • 由 多个 ICoder 输入/输出转换处理
  • CoderParser 类组装多个 ICoder
  • 编/码处理器 注意优先级
  • nio read -> packageCoder -> link coders -> handle
  • handle write -> link coders -> packageCoder -> nio write
  • 由 ICoderParserManager 管理调用处理
public interface ICoderParserManager {

    /**
     * 解码处理
     *
     * @return CoderResult
     * */
    CoderResult decode(ByteBuffer buffer, ICoderCtx ctx);

    /**
     * 编码处理
     * */
    ByteBuffer encode(Object message, ICoderCtx ctx);

    void error(ByteBuffer buffer, ICoderCtx ctx);

    /** 注册 编/码处理器 */
    void register(CoderParser coderParser);
}

其中核心
decode
encode

  @Override
    public CoderResult decode(ByteBuffer buffer, ICoderCtx ctx) {
	final SocketChannelCtx socketChannelCtx = (SocketChannelCtx) ctx;
	final ClientSocket clientSocket = socketChannelCtx.getClientSocket();

	for (CoderParser coderParser : coderParsers.values()) {
	    final IPackageCoder packageCoder = coderParser.getPackageCoder();
	    final ICoder<?, ?>[] linkCoders = coderParser.getCoders();
	    final IHandle handle = coderParser.getHandle();
	    Object value = null;
	    synchronized (buffer) {
		// 已解析完
		if (socketChannelCtx.getCurrPackageIndex() >= buffer.limit()) {
		    return CoderResult.valueOf(ResultValue.UNFINISHED);
		}
		// 包协议处理
		if (!packageCoder.verify(buffer, ctx)) {
		    continue;
		}
		// 包解析
		value = packageCoder.decode(buffer, ctx);
		if (value == null) {
		    // 包未读完整
		    return CoderResult.valueOf(ResultValue.UNFINISHED);
		}
	    }
	    // 链式处理
	    if (linkCoders != null) {
		for (ICoder coder : linkCoders) {
		    value = coder.decode(value, ctx);
		    if (value == null) {
			throw new CoderException("解码出错 : " + coder.getClass());
		    }
		}
	    }
	    // 业务解码处理
	    value = handle.decode(value, ctx);
	    clientSocket.readBefore(socketChannelCtx, value);
	    handle.handle(value, ctx);
	    clientSocket.readAfter(socketChannelCtx, value);

	    return CoderResult.valueOf(ResultValue.SUCCEED);
	}
	return CoderResult.valueOf(ResultValue.NOT_FIND_CODER);
    }

    @Override
    public ByteBuffer encode(Object message, ICoderCtx ctx) {

	for (CoderParser coderParser : coderParsers.values()) {
	    final IPackageCoder packageCoder = coderParser.getPackageCoder();
	    final ICoder<?, ?>[] linkCoders = coderParser.getCoders();
	    final IHandle handle = coderParser.getHandle();
	    // 业务检查
	    if (!handle.verify(message, ctx)) {
		continue;
	    }
	    // 业务编码处理
	    Object value = handle.encode(message, ctx);
	    // 链式处理
	    if (linkCoders != null) {
		for (int i = linkCoders.length - 1; i >= 0; i--) {
		    ICoder coder = linkCoders[i];
		    value = coder.encode(value, ctx);
		    if (value == null) {
			throw new CoderException("编码出错 : " + coder.getClass());
		    }
		}
	    }
	    // 打包消息处理
	    value = packageCoder.encode(value, ctx);
	    if (value != null) {
		return (ByteBuffer) value;
	    }
	    throw new CoderException("编码出错  :" + packageCoder.getClass());
	}

	throw new CoderException("未找到编/解码处理器 ");
   }
  • 半包/帖包处理 : AbstractISocketChannel doRead方法摘要,根据解码返回的状态做处理。
  • 半包:当不是完成状态时,继续解码,从最后一次包索引开始处理
  • 帖包:当完成包解码移动包索引,等侍下轮解码处理
       boolean run = true;
    	    // 粘包处理
    	    while (run) {
    		ByteBuffer cpbuffer = socketChannelCtx.coderBegin();
    		cpbuffer.mark();
    		CoderResult coderResult = coderParserManager.decode(cpbuffer, socketChannelCtx);
    		switch (coderResult.getValue()) {
    		case SUCCEED:
    		    break;
    		case NOT_FIND_CODER:
    		    final int readySize = socketChannelCtx.getWriteIndex() - socketChannelCtx.getCurrPackageIndex();
    		    final int headLimit = 255;
    		    if (readySize >= headLimit) {
    			throw new CoderException("未找到编/解码处理器 ");
    		    }
    		    run = false;
    		    break;
    		case UNFINISHED:
    		case UNKNOWN:
    		case ERROR:
    		default:
    		    run = false;
    		    // TODO throw
    		    break;
    		}
    	  }

http://www.cnblogs.com/solq/p/4585496.html

时间: 2024-08-02 00:16:37

java nio 网络框架实现(转)的相关文章

Java NIO编程的技巧和陷阱

去年做的分享,一直上传slideshare失败,今天又试了下,成功了.这个主题主要介绍Java NIO编程的技巧和陷阱,解读了一些NIO框架的源码,以及编写高性能NIO网络框架所需要注意的技巧和缺陷.关注这方面的朋友可以看一下.去年写了篇blog提供了pdf版本的下载,看这里. Nio trick and trap View more presentations from dennis zhuang 文章转自庄周梦蝶  ,原文发布时间2011-06-30

java nio框架netty 与tomcat的关系

问题描述 最近在研究http异步客户端. 看到了一堆名词,servlet3.0,jetty,tomcat,HttpAsyncClient netty,mina nio,nio2.0,iocp等等.略混乱   我的问题就是:netty跟tomcat是同样的概念么? netty官方说是个框架,那他是否还需要web容器支持?如果我客户端使用netty,服务端使用tomcat也是能连上的吧?是不是可以用netty的客户端和服务端 直接替换掉HttpAsyncClient和tomcat?    自己整理的

高性能Java网络框架 MINA

Apache MINA(Multipurpose Infrastructure for Network Applications) 是 Apache 组织一个较新的项目,它为开发高性能和高可用性的网络应用程序提供了非常便利的框架.当前发行的 MINA 版本支持基于 Java NIO 技术的 TCP/UDP 应用程序开发.串口通讯程序(只在最新的预览版中提供),MINA 所支持的功能也在进一步的扩展中. 简介       Apache MINA是一个网络应用程序框架,用来帮助用户简单地开发高性能和

高性能 Java 网络框架 Apache MINA 2.0.14

Apache MINA 2.0.14 发布了,该版本主要是 bug 修复,包括: Some closing session remaining in this state forever, leading to a leak A vulnerability when using OGNL Session weren't close immediately, leading to some cases where some messages could still be sent 详情可查阅[发行

从Netty到EPollSelectorImpl学习Java NIO

终于可以在写了几篇鸡汤文后,来篇技术文章了,:),题图是Trustin Lee,Mina/Netty都是他搞的,对Java程序员尤其是写通讯类的都产生了巨大影响,向他致敬! 在上周查一个内存OOM的问题之前,我一直觉得自己对Java NIO应该还是比较懂的,君不见N年前我曾经写过一篇<NFS-RPC框架优化过程(从37K到168K)>(尴尬的发现,上次导blog记录的时候竟然丢了一些文章,于是这文章link就不是自己的blog了),从那优化经历来说理论上对Netty的原理应该已经是比较清楚了才

攻破JAVA NIO技术壁垒

现在使用NIO的场景越来越多,很多网上的技术框架或多或少的使用NIO技术,譬如Tomcat,Jetty.学习和掌握NIO技术已经不是一个JAVA攻城狮的加分技能,而是一个必备技能.再者,现在互联网的面试中上点level的都会涉及一下NIO或者AIO的问题(AIO下次再讲述,本篇主要讲述NIO),掌握好NIO也能帮助你获得一份较好的offer. 驱使博主写这篇文章的关键是网上关于NIO的文章并不是很多,而且案例较少,针对这个特性,本文主要通过实际案例主要讲述NIO的用法,每个案例都经过实际检验.博

java NIO中的Reactor相关知识汇总 (转)

一.引子     nio是java的IO框架里边十分重要的一部分内容,其最核心的就是提供了非阻塞IO的处理方式,最典型的应用场景就是处理网络连接.很多同学提起nio都能说起一二,但是细究其背后的原理.思想往往就开始背书,说来说去都是那么几句,其中不少人并不见的真的很理解.本人之前就属于此类,看了很多书和博客,但是大多数都只是讲了三件套和怎么使用,很少会很细致的讲背后的思想,那本次我们就来扒一扒吧.     很多博客描述nio都是这么说的:基于Reactor模式实现的多路非阻塞高性能的网络IO.那

Java NIO之EPollSelectorImpl详解

这是滴滴的架构师欧阳康同学写的,非常赞,从EPollSelectorImpl到OS层面实现的详细解释,可以让大家对Java NIO的实现有更完整的理解,强烈推荐. 本文简述JDK1.7的NIO在linux平台上的实现,对java NIO的一些核心概念如Selector,Channel,Buffer等,不会做过多解释,这些请参考JDK的文档.JDK 1.7 NIO Selector在linux平台上的实现类是sun.nio.ch.EPollSelectorImpl,这个类通过linux下的epol

Java NIO 完全学习笔记(转)

本篇博客依照 Java NIO Tutorial 翻译,算是学习 Java NIO 的一个读书笔记.建议大家可以去阅读原文,相信你肯定会受益良多. 1. Java NIO Tutorial Java NIO,被称为新 IO(New IO),是 Java 1.4 引入的,用来替代 IO API的. Java NIO:Channels and Buffers 标准的 Java IO API ,你操作的对象是字节流(byte stream)或者字符流(character stream),而 NIO,你