`

Java NIO 02 - 常识篇

阅读更多

   (一)、回顾一下上一篇01:
(1)、NIO的几个概念:
①、Buffer :内存块,实质就是一个数组。NIO数据读或写得中转地。
②、Channel:连接设备的通道。用于向buffer提供数据或者读取数据,异步I/O支持。
③、Selector :channel事件的监听者,他能检测到一个或多个通道,并将事件分发出去
④、SelectionKey:channel上发生的事件,包含了事件的状态信息和时间以及对应的channel。
(2)、在前面总结一中,最后的时候给出了一个完整的关于NIO操作网络套接字的例子,在这里先总结一下构建基于NIO的服务端的一般步骤:

①、构造一个Selector
②、打开一个serverSocketChannel
③、设定serverSocketChannel为非阻塞
④、绑定socketserverChannel到一个主机地址和端口
⑤、注册selector并告知感兴趣的事情
(3)、Channel的状态有四种:
①、Connectable:当一个Channel完成socket连接操作已完成或者已失败。
②、Acceptable:当一个Channel已准备好接受一个新的socket连接时,channel是Acceptale
③、Readable:当一个channel能被读时。
④、Writable:当一个Channel能被写时为可写状态。
(4)、下面是NIO中的关系图,来自于《java编程思想》


 (二)、 基于多线程的NIO
总结一的例子,是基于单线程的,单线程的好处是简单,不用去考虑过于复杂的线程问题,但是仔细想一下,如果数据在网络传输的过程中发生了阻塞呢,那岂不是要花费很多的时间?再者如果我们要实现像QQ中的聊天室呢,如何实现呢?。为了解决这些问题,我们现在试着采用多线程的,但是采用多线程,会产生很多线程,创建、销毁线程都是要花费时间的,所以这里可以运用到线程池来管理。
下面一个例子是:客户端发来信息,服务端然后转发所有的信息给在线的客户端。 

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
public class RSocketServer implements Runnable {
	private final static int POOLSIZE = 100;// 处理线程池的大小
	private SelectionKey selectionKey; // 选择键
	private ExecutorService service = Executors.newFixedThreadPool(POOLSIZE);// 固定大小的线程池
	private boolean isRunning = true;
	private Selector selector;// 选择器
	private String writeMsg;// 需要写的信息
	private ServerSocketChannel ssc;
	public RSocketServer() {
		try {
			selector = Selector.open();
			ssc = ServerSocketChannel.open();
			ssc.configureBlocking(false);
			ssc.socket().bind(new InetSocketAddress(8080));
			selectionKey = ssc.register(selector, SelectionKey.OP_ACCEPT);
			System.out.println("服务器启动成功!正在端口为8080上等待...");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	public void run() {
		try {
			while (isRunning) {
				int num = -1;
				try {
					// 监控注册在selector上的SelectableChannel
					num = selector.select();
				} catch (IOException e) {
					e.printStackTrace();
				}
				if (num == 0) {
					continue;
				}
				Iterator<SelectionKey> it = selector.selectedKeys().iterator();
				while (it.hasNext()) {
					SelectionKey key = it.next();
					it.remove();
					if (!key.isValid())
						continue;
					if (key.isAcceptable()) {
						getConn(key);
					} else if (key.isReadable()) {
						System.out.println("可读");
						readMsg(key);
					}else if (key.isValid() && key.isWritable()) {
						if (writeMsg != null) {
							System.out.println("可写");
							RWriter(key);
						}
					}
					else break;
				}
			}
			Thread.yield();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	private void getConn(SelectionKey key) throws IOException {
		ssc = (ServerSocketChannel) key.channel();
		SocketChannel sc = ssc.accept();
		sc.configureBlocking(false);
		sc.register(selector, SelectionKey.OP_READ);
	}
	private void readMsg(SelectionKey key) throws IOException {
		StringBuffer sb = new StringBuffer();
		SocketChannel sc = (SocketChannel) key.channel();
		ByteBuffer buffer = ByteBuffer.allocate(1024);
		buffer.clear();
		int len = 0;
		while ((len = sc.read(buffer)) > 0) {
			buffer.flip();
			sb.append(new String(buffer.array(), 0, len));
		}
		if (sb.length() > 0)
			System.out.println("从客户端发来的数据:" + sb.toString());
		if ("exit".equals(sb.toString().trim())||sb.length()==0) {
			sc.write(ByteBuffer.wrap("bye".getBytes()));
			System.out.println("服务端已经关闭");
			key.cancel();
			sc.close();
			sc.socket().close();
		} else {
			String msg = sc.socket().getRemoteSocketAddress() + ":"
					+ sb.toString();
			Iterator<SelectionKey> it = key.selector().keys().iterator();
			// 把数据分发到每一个已经连接的客户端
			while (it.hasNext()) {
				SelectionKey skey = it.next();
				if (skey != key && skey != selectionKey) {
					RWriter myWriter = new RWriter(skey, msg);
					service.execute(myWriter);
				}
			}
		}
	}
	public static void main(String[] args) {
		RSocketServer server = new RSocketServer();
		new Thread(server).start();
	}
	class RWriter implements Runnable {
		SelectionKey key;
		String msg;
		public RWriter(SelectionKey key, String msg) {
			this.key = key;
			this.msg = msg;
		}
		public void run() {
			try {
				SocketChannel client = (SocketChannel) key.channel();
				client.write(ByteBuffer.wrap(msg.getBytes()));
				Thread.yield();
			} catch (IOException ex) {
				Logger.getLogger(RWriter.class.getName()).log(Level.SEVERE,
						null, ex);
			}
		}
	}
	private void RWriter(SelectionKey key) throws IOException {
		SocketChannel sc = (SocketChannel) key.channel();
		String str = (String) key.attachment();
		sc.write(ByteBuffer.wrap(str.getBytes()));
		key.interestOps(SelectionKey.OP_READ);
	}
}

 (三)、 Java NIO的Reactor模式

        上面的例子,有没有发觉一些问题呢?会不会觉得例子有点“乱七八糟”的感觉,接收数据、业务逻辑、发送数据、数据的包装等等,这些都需要我们去处理。但是往往我们只是对业务逻辑感兴趣。于是我们能不能只是处理业务逻辑,而其他的都包装好(就像java web中的Servlet,我们只需要重写doPost()/doGet()来处理业务逻辑,其他的都不用管)?带着这些问题去google了一下,但是结果是建议使用一些java nio的框架(Mina、Cindy等),因为这些已经封装好了,只是使用一些API就行了。这样也是一种方法,但是,问题是不知道如何进行底层的实现。经过这段日子带着问题查阅资料,原来才知道他们是借Java NIO的Reactor模式来实现的,只是对现有的Reactor模式架构做了一层封装。那何为Reactor模式呢?(下面有些地方参考《Scalable IO in java》、(http://www.jdon.com/concurrent/nio.pdf))
(1)、Reactor模式中的几个概念:
  ①、Reactor(反应器):负责响应IO事件,当检测到一个新的事件,将其发送给相应的Handler去处理。
  ②、Handlers(句柄):负责处理非阻塞的行为,标识系统管理的资源;同时将handlers与事件绑定;
  ③、Event Handler(事件处理器):以特定的方式来处理相应的事件。
下面图为基于Java NIO的Reactor模式

(2)、Java实现NIO Reactor模式
①、Reactor类,实现NIO的创建以及事件的监听。(关键代码)
 selector = Selector.open();
 。。。(略,上面的例子中有)
  SelectionKey sk = serverSocket.register(…);
 //这里利用sk的attach绑定Acceptor,如果有事件,则触//发Acceptor,相当于swing中的addListener(Listener)
  Sk.attach(new Acceptor());
②、任务分派处理,相应运行任务。
Void dispatch(selectionKey key){
 Runnable r = (Runnable)it.next();
 if(r!=null)
  r.run(); 
}
③、Reactor里内置Acceptor类,实现分派任务操作处理。
  try{
SocketChannel c = serverSocket.accept();
      if (c != null)
        //调用Handler来处理channel
        new SocketReadHandler(selector, c);
      }
}
④、Handler类,负责处理请求。
 Handler(Selector sel, SocketChannel c)
    throws IOException {
    socket = c;
    socket.configureBlocking(false);
     sk = socket.register(sel, 0);
    //将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法
    sk.attach(this);
    //同时将SelectionKey标记为可读,以便读取。
    sk.interestOps(SelectionKey.OP_READ);
    sel.wakeup();
  }
了解了这些底层基础的知识,就可以学习一些NIO框架了,比如Apache Mina框架或者其他的等等。学起来也会比较的清晰。
(四)、 Java NIO  小结
(1)、Java NIO 与 流IO,在不同的方面各自有优势。在性能上,NIO系统比传统流IO有优势。但是如果我们要在服务器端实现同步通信、数据不必在读取前进行处理,那么可以考虑用流IO。如果服务端系统需要进行异步通信、有频繁的数据处理,那么可以考虑用NIO。
(2)、要想深入了解Java NIO的机制,先要了解他的基本概念,这个很重要。其次,要理解Java NIO的Reactor模式,那么多的java NIO框架都是基于他的,可以看出他的重要性和实用性。
      这篇文章,我一年前就在百度文库那里发表过(Doc文档的形式),如果想下载,可以到http://wenku.baidu.com/view/2154350f581b6bd97f19eac0.html 下载。

  • 大小: 93.9 KB
  • 大小: 26 KB
1
3
分享到:
评论
2 楼 JimmyHR 2013-03-29  
呵呵常来踩,继续有更新博文
1 楼 jiakechong 2013-03-28  
nio知识受用

相关推荐

Global site tag (gtag.js) - Google Analytics