基于java.nio.channels的编程实践-I

服务端代码

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NIOSocketServer extends Thread {
	private static final Logger LOG = LoggerFactory
			.getLogger(NIOSocketServer.class);
	private static final String CHARSET = "UTF-8";
	private static final int BUFFER_SIZE = 1024;
	private static final int FAIL_TRY_NUM = 3;

	private Selector selector;
	private ServerSocketChannel ssc;
	private static NIOSocketServer server;

	/**
	 * 程序入口
	 * 
	 * @param args
	 */
	public static void main(String[] args) {
		server = new NIOSocketServer();
		try {
			// server.setDaemon(true);
			server.initServer();
			server.start();
		} catch (Exception e) {
			// 如果出现异常,则直接关闭客户端
			server.stopServer();
			System.exit(1);
		}
	}

	@Override
	public void run() {
		int failNum = 0;
		while (true) {
			try {
				int select = selector.select();
				if (select > 0) {
					Set<SelectionKey> keys = selector.selectedKeys();
					Iterator<SelectionKey> iter = keys.iterator();
					while (iter.hasNext()) {
						SelectionKey key = iter.next();
						if (key.isAcceptable()) {
							doAcceptable(key);
						}
						if (key.isWritable()) {
							doWriteMessage(key);
						}
						if (key.isReadable()) {
							doReadMessage(key);
						}
						if (key.isConnectable()) {
							doConnectable(key);
						}
						iter.remove();
					}
				}
			} catch (Exception e) {
				failNum++;
				if (failNum > FAIL_TRY_NUM) {
					server.stopServer();
				}
			}
		}

	}

	/**
	 * 初始化服务器端程序,开始监听端口
	 * 
	 * @throws IOException
	 */
	private void initServer() throws IOException {
		selector = Selector.open();
		ssc = ServerSocketChannel.open();
		ssc.configureBlocking(false);
		ssc.socket().bind(new InetSocketAddress(2181));
		ssc.register(selector, SelectionKey.OP_ACCEPT);
	}

	/**
	 * 停止服务器
	 * 
	 * @throws IOException
	 */
	private void stopServer() {
		try {
			if (selector != null && selector.isOpen()) {
				selector.close();
			}
			if (ssc != null && ssc.isOpen()) {
				ssc.close();
			}
		} catch (IOException e) {
			LOG.info("关闭服务端失败:" + e.getMessage());
		}
	}

	/**
	 * 对新的客户端连接进行处理
	 * 
	 * @param key
	 * @throws IOException
	 */
	private void doAcceptable(SelectionKey key) throws IOException {
		ServerSocketChannel tmpSsc = (ServerSocketChannel) key.channel();
		SocketChannel ss = tmpSsc.accept();
		ss.configureBlocking(false);
		ss.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);

	}

	/**
	 * 已连接
	 * 
	 * @param key
	 */
	private void doConnectable(SelectionKey key) {
		LOG.info("connect is ok");
	}

	/**
	 * 写消息到客户端
	 * 
	 * @param key
	 * @throws IOException
	 */
	private void doWriteMessage(SelectionKey key) throws Exception {
		SocketChannel sc = (SocketChannel) key.channel();
		ByteBuffer buffer = ByteBuffer.wrap("server write msg to client"
				.getBytes(CHARSET));
		while (buffer.hasRemaining()) {
			sc.write(buffer);
		}
		TimeUnit.SECONDS.sleep(1);
	}

	/**
	 * @param key
	 * @throws IOException
	 */
	private void doReadMessage(SelectionKey key) throws Exception {
		SocketChannel sc = (SocketChannel) key.channel();
		ByteBuffer bb = ByteBuffer.allocate(BUFFER_SIZE);
		int read = sc.read(bb);
		while (read > 0) {
			bb.flip();
			byte[] barr = new byte[bb.limit()];
			bb.get(barr);
			LOG.info("server read msg from client:" + new String(barr, CHARSET));
			bb.clear();
			read = sc.read(bb);
		}
		TimeUnit.SECONDS.sleep(1);
	}

}

客户端代码

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NIOSocketClient extends Thread {
	private static final Logger LOG = LoggerFactory
			.getLogger(NIOSocketClient.class);
	private static final String CHARSET = "UTF-8";
	private static final int BUFFER_SIZE = 1024;
	private static final int FAIL_TRY_NUM = 3;

	private SocketChannel socketChannel;
	private Selector selector;
	private static NIOSocketClient client;

	/**
	 * 程序入口
	 * 
	 * @param args
	 */
	public static void main(String[] args) {
		client = new NIOSocketClient();
		try {
			client.initClient();
			client.start();
			// client.setDaemon(true);
		} catch (Exception e) {
			// 如果出现异常,则直接关闭客户端
			client.close();
		}
	}

	public void run() {
		int failNum = 0;
		while (true) {
			try {
				writeMessage();
				int select = selector.select();
				System.out.println(select);
				if (select > 0) {
					Set<SelectionKey> keys = selector.selectedKeys();
					Iterator<SelectionKey> iter = keys.iterator();
					while (iter.hasNext()) {
						SelectionKey sk = iter.next();
						if (sk.isReadable()) {
							readMessage(sk);
						}
						iter.remove();
					}
				}
			} catch (Exception e) {
				// 如果出现三次以上的异常,则关闭客户端
				failNum++;
				if (failNum > FAIL_TRY_NUM) {
					client.close();
					System.exit(1);
				}
			}
		}
	}

	public void readMessage(SelectionKey sk) throws Exception,
			UnsupportedEncodingException {
		SocketChannel curSc = (SocketChannel) sk.channel();
		ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
		while (curSc.read(buffer) > 0) {
			buffer.flip();
			LOG.info("read message from server:"
					+ new String(buffer.array(), CHARSET));
			buffer.clear();
		}
		TimeUnit.SECONDS.sleep(1);
	}

	public void writeMessage() throws Exception {
		String ss = "client write msg to server";
		ByteBuffer buffer = ByteBuffer.wrap(ss.getBytes(CHARSET));
		while (buffer.hasRemaining()) {
			socketChannel.write(buffer);
		}

		TimeUnit.SECONDS.sleep(1);
	}

	public void initClient() throws IOException, ClosedChannelException {
		InetSocketAddress addr = new InetSocketAddress(2181);
		socketChannel = SocketChannel.open();

		selector = Selector.open();
		socketChannel.configureBlocking(false);
		socketChannel.register(selector, SelectionKey.OP_READ);

		// 连接到server
		socketChannel.connect(addr);

		while (!socketChannel.finishConnect()) {
			LOG.info("check finish connection");
		}
	}

	/**
	 * 停止客户端
	 */
	private void close() {
		try {
			if (selector != null && selector.isOpen()) {
				selector.close();
			}
			if (socketChannel != null && socketChannel.isOpen()) {
				socketChannel.close();
			}
		} catch (IOException e) {
			LOG.info("关闭客户端失败:" + e.getMessage());
		}
	}

}
时间: 2024-09-19 09:48:53

基于java.nio.channels的编程实践-I的相关文章

基于java.nio.channels的编程实践-II

介绍 为了提供并发处理效率,把用户的请求连接随机分配到线程池的线程进行处理,hbase也是采用同样的方式处理用户请求的 客户端代码可以参考:基于java.nio.channels的编程实践-I 代码 import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.chan

基于java nio的memcached客户端

1.xmemcached是什么? xmemcached是基于java nio实现的memcached客户端API. 实际上是基于我实现的一个简单nio框架 http://code.google.com/p/yanf4j/的基础上实现的(目前是基于yanf4j 0.52),核心代码不超过1000行,序列化机制直接挪用spymemcached的Transcoder. 性能方面,在读写简单类型上比之spymemcached还是有差距,在读写比较大的对象(如集合)有效率优势. 当前0.50-beta版本

缓冲区-java NIO服务器并发编程

问题描述 java NIO服务器并发编程 各位CSDN的编程大神们!求帮忙,求解决方案!跪求! 最近开发一个基于nio的服务器端程序(也包括客户端),实现一个多人(很多人)并发进行即时通讯的东东...发觉这个NIO太难搞了,不过总算还是能够建立一个连接发发"hello world"之类的东西,但是问题来了.由于NIO是针对缓冲区进行操作的,所有数据只能够写入到缓冲区中(我用的是byteBuffer),完了我自定义了一个数据包的格式,如下:|包头一个字节|包长度四个字节|包内容长度可变|

《Java线程与并发编程实践》—— 导读

前言 Java线程与并发编程实践 线程和并发工具并非尤物,但是它们是正式应用的重要部分.本书会向你介绍Java 8 Update 60中线程特性以及并发工具的大部分内容. 第1章介绍了类Thread和接口Runnable.你会学习如何创建Thread以及Runnable对象,获取和设置线程状态.启动线程.中断线程,将一条线程插入另外一条线程以及触发线程睡眠. 第2章关注同步.学习后你会解决一些问题,如没有同步就无法解决的竞态条件.你也能学到如何创建同步方法.块,以及如何使用忽略互斥访问的轻量级同

《Java线程与并发编程实践》- 第1章 Thread和Runnable

第1章 Thread和Runnable Java线程与并发编程实践 Java程序是通过线程执行的,线程在程序中具有独立的执行路径.当多条线程执行时,它们彼此之间的路径可以不同.举个例子,一条线程可能在执行switch语句的某个case分支,另一条线程很可能在执行其他case分支. 每个Java应用程序都有一个执行main()函数的默认主线程.应用程序也可以创建线程在后台操作时间密集型任务,以确保对用户的响应.这些封装了代码执行序列的线程对象就被称为runnable. Java虚拟机给每条线程分配

基于java nio的memcached客户端——xmemcached

1.xmemcached是什么? xmemcached是基于java nio实现的memcached客户端API. 实际上是基于我实现的一个简单nio框架 http://code.google.com/p/yanf4j/的基础上实现的(目前是基于yanf4j 0.52),核心代码不超过1000行,序列化机制直接挪用spymemcached的Transcoder. 性能方面,在读写简单类型上比之spymemcached还是有差距,在读写比较大的对象(如集合)有效率优势. 当 前0.50-beta版

《Java线程与并发编程实践》—— 第2章 同步 2.1 线程中的问题

第2章 同步 Java线程与并发编程实践 线程交互通常是通过共享变量完成的,当线程之间没有交互时,开发多线程的应用程序会变得简单许多.一旦发生了交互,很多诱发线程不安全(在多线程环境下不正确)的因素就会暴露出来.在这一章中,你将会认识到这些问题,同时也会学习如何正确地使用Java面向同步的特性来克服它们. 2.1 线程中的问题 Java对线程的支持促进了响应式.可扩展应用程序的发展.不过,这样的支持是以增加复杂性作为代价的.如果不多加小心,你的代码就会到处充斥着极难以察觉的bug,而这些bug多

Java 5.0多线程编程实践

Java5增加了新的类库并发集java.util.concurrent,该类库为并发程序提供 了丰富的API多线程编程在Java 5中更加容易,灵活.本文通过一个网络服务器 模型,来实践Java5的多线程编程,该模型中使用了Java5中的线程池,阻塞队列 ,可重入锁等,还实践了Callable, Future等接口,并使用了Java 5的另外一 个新特性泛型. 简介 本文将实现一个网络服务器模型,一旦有客户端连接到该服务器,则启动一 个新线程为该连接服务,服务内容为往客户端输送一些字符信息.一个

《Java线程与并发编程实践》—— 1.2 操作更高级的线程任务

1.2 操作更高级的线程任务 之前的线程任务都和如何配置一个线程对象以及启动关联的线程相关.不过,Thread类也能支持更多高级的任务,包括中断其他线程.将线程join到另一条线程中以及致使线程睡眠. 1.2.1 中断线程 Thread类提供了一种线程可以中断其他线程的机制.当一个线程被中断时,它会抛出java.lang.InterruptedException.这一机制由下面的3种方法构成. void interrupt():中断调用此方法的Thread对象所关联的线程.当一条线程由于调用了T