<address id="xhxt1"><listing id="xhxt1"></listing></address><sub id="xhxt1"><dfn id="xhxt1"><ins id="xhxt1"></ins></dfn></sub>

    <thead id="xhxt1"><dfn id="xhxt1"><ins id="xhxt1"></ins></dfn></thead>

    《Netty 权威指南》—— NIO创建的TimeServer源码分析

    声明:本文是《Netty 权威指南》的样章,感谢博文视点授权并发编程网站发布样章,禁止以任何形式转载此文。

    我们将在TimeServer例程中给出完整的NIO创建的时间服务器源码:

    public class TimeServer {
    
        /**
         * @param args
         * @throws IOException
         */
        public static void main(String[] args) throws IOException {
    	int port = 8080;
    	if (args != null &amp;&amp; args.length &gt; 0) {
    	    try {
    		port = Integer.valueOf(args[0]);
    	    } catch (NumberFormatException e) {
    		// 采用默认值
    	    }
    	}
    	MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
    	New Thread(timeServer, &quot;NIO-MultiplexerTimeServer-001&quot;).start();
        }
    }
    
    

     

    我们对NIO创建的TimeServer进行下简单分析,8-15行跟之前的一样,设置监听端口。16-17行创建了一个被称为MultiplexerTimeServer的多路复用类,它是个一个独立的线程,负责轮询多路复用器Selctor,可以处理多个客户端的并发接入,现在我们继续看MultiplexerTimeServer的源码:

    public class MultiplexerTimeServer implements Runnable {
    
        private Selector selector;
    
        private ServerSocketChannel servChannel;
    
        private volatile boolean stop;
    
        /**
         * 初始化多路复用器、绑定监听端口
         * 
         * @param port
         */
        public MultiplexerTimeServer(int port) {
    	try {
    	    selector = Selector.open();
    	    servChannel = ServerSocketChannel.open();
    	    servChannel.configureBlocking(false);
    	    servChannel.socket().bind(new InetSocketAddress(port), 1024);
    	    servChannel.register(selector, SelectionKey.OP_ACCEPT);
    	    System.out.println(&quot;The time server is start in port : &quot; + port);
    	} catch (IOException e) {
    	    e.printStackTrace();
    	    System.exit(1);
    	}
        }
    
        public void stop() {
    	this.stop = true;
        }
    
        /*
         * (non-Javadoc)
         * 
         * @see java.lang.Runnable#run()
         */
        @Override
        public void run() {
    	while (!stop) {
    	    try {
    		selector.select(1000);
    		Set&lt;SelectionKey&gt; selectedKeys = selector.selectedKeys();
    		Iterator&lt;SelectionKey&gt; it = selectedKeys.iterator();
    		SelectionKey key = null;
    		while (it.hasNext()) {
    		    key = it.next();
    		    it.remove();
    		    try {
    			handleInput(key);
    		    } catch (Exception e) {
    			if (key != null) {
    			    key.cancel();
    			    if (key.channel() != null)
    				key.channel().close();
    			}
    		    }
    		}
    	    } catch (Throwable t) {
    		t.printStackTrace();
    	    }
    	}
    
    	// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
    	if (selector != null)
    	    try {
    		selector.close();
    	    } catch (IOException e) {
    		e.printStackTrace();
    	    }
        }
    
        private void handleInput(SelectionKey key) throws IOException {
    
    	if (key.isValid()) {
    	    // 处理新接入的请求消息
    	    if (key.isAcceptable()) {
    		// Accept the new connection
    		ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
    		SocketChannel sc = ssc.accept();
    		sc.configureBlocking(false);
    		// Add the new connection to the selector
    		sc.register(selector, SelectionKey.OP_READ);
    	    }
    	    if (key.isReadable()) {
    		// Read the data
    		SocketChannel sc = (SocketChannel) key.channel();
    		ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    		int readBytes = sc.read(readBuffer);
    		if (readBytes &gt; 0) {
    		    readBuffer.flip();
    		    byte[] bytes = new byte[readBuffer.remaining()];
    		    readBuffer.get(bytes);
    		    String body = new String(bytes, &quot;UTF-8&quot;);
    		    System.out.println(&quot;The time server receive order : &quot;
    			    + body);
    		    String currentTime = &quot;QUERY TIME ORDER&quot;
    			    .equalsIgnoreCase(body) ? new java.util.Date(
    			    System.currentTimeMillis()).toString()
    			    : &quot;BAD ORDER&quot;;
    		    doWrite(sc, currentTime);
    		} else if (readBytes &lt; 0) {
    		    // 对端链路关闭
    		    key.cancel();
    		    sc.close();
    		} else
    		    ; // 读到0字节,忽略
    	    }
    	}
        }
    
        private void doWrite(SocketChannel channel, String response)
    	    throws IOException {
    	if (response != null &amp;&amp; response.trim().length() &gt; 0) {
    	    byte[] bytes = response.getBytes();
    	    ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
    	    writeBuffer.put(bytes);
    	    writeBuffer.flip();
    	    channel.write(writeBuffer);
    	}
        }
    }
    
    

    由于这个类相比于传统的Socket编程稍微复杂一些,在此我们进行详细分析,我们从如下几个关键步骤讲解多路复用处理类:

    14-26行为构造方法,在构造方法中进行资源初始化,创建多路复用器Selector、ServerSocketChannel,对Channel和TCP参数进行配置,例如将ServerSocketChannel设置为异步非阻塞模式,它的backlog设置为1024。系统资源初始化成功后将ServerSocketChannel注册到Selector,监听SelectionKey.OP_ACCEPT操作位;如果资源初始化失败,例如端口被占用则退出

    39-61行在线程的run方法的while循环体中循环遍历selector,它的休眠时间为1S,无论是否有读写等事件发生,selector每隔1S都被唤醒一次,selector也提供了一个无参的select方法。当有处于就绪状态的Channel时,selector将返回就绪状态的Channel的SelectionKey集合,我们通过对就绪状态的Channel集合进行迭代,就可以进行网络的异步读写操作

    76-83行处理新接入的客户端请求消息,根据SelectionKey的操作位进行判断即可获知网络事件的类型,通过ServerSocketChannel的accept接收客户端的连接请求并创建SocketChannel实例,完成上述操作后,相当于完成了TCP的三次握手,TCP物理链路正式建立。注意,我们需要将新创建的SocketChannel设置为异步非阻塞,同时也可以对其TCP参数进行设置,例如TCP接收和发送缓冲区的大小等,作为入门的例子,例程没有进行额外的参数设置

    84-109行用于读取客户端的请求消息,首先创建一个ByteBuffer,由于我们事先无法得知客户端发送的码流大小,作为例程,我们开辟一个1M的缓冲区。然后调用SocketChannel的read方法读取请求码流,注意,由于我们已经将SocketChannel设置为异步非阻塞模式,因此它的read是非阻塞的。使用返回值进行判断,看读取到的字节数,返回值有三种可能的结果:

    1)????? 返回值大于0:读到了字节,对字节进行编解码;

    2)????? 返回值等于0:没有读取到字节,属于正常场景,忽略;

    3)????? 返回值为-1:链路已经关闭,需要关闭SocketChannel,释放资源。

    当读取到码流以后,我们进行解码,首先对readBuffer进行flip操作,它的作用是将缓冲区当前的limit设置为position,position设置为0,用于后续对缓冲区的读取操作。然后根据缓冲区可读的字节个数创建字节数组,调用ByteBuffer的get操作将缓冲区可读的字节数组拷贝到新创建的字节数组中,最后调用字符串的构造函数创建请求消息体并打印。如果请求指令是”QUERY TIME ORDER”则把服务器的当前时间编码后返回给客户端,下面我们看看如果异步发送应答消息给客户端。

    111-119行将应答消息异步发送给客户端,我们看下关键代码,首先将字符串编码成字节数组,根据字节数组的容量创建ByteBuffer,调用ByteBuffer的put操作将字节数组拷贝到缓冲区中,然后对缓冲区进行flip操作,最后调用SocketChannel的write方法将缓冲区中的字节数组发送出去。需要指出的是,由于SocketChannel是异步非阻塞的,它并不保证一次能够把需要发送的字节数组发送完,此时会出现“写半包”问题,我们需要注册写操作,不断轮询Selector将没有发送完的ByteBuffer发送完毕,可以通过ByteBuffer的hasRemain()方法判断消息是否发送完成。此处仅仅是个简单的入门级例程,没有演示如何处理“写半包”场景,后续的章节会有详细说明。

    使用NIO创建TimeServer服务器完成之后,我们继续学习如何创建NIO客户端。首先还是通过时序图了解关键步骤和过程,然后结合代码进行详细分析。


    FavoriteLoading添加本文到我的收藏
    • Trackback 关闭
    • 评论 (3)
      • kangfoo
      • 2014/07/20 3:08上午

      “负责轮询多路复用器Selctor”
      此处和 《Netty 权威指南》 刊印的时候都 出现了 “Selctor” 拼写错误。 应该为 “Selector” 。

      • 江南烟雨
      • 2014/11/13 9:15下午

      代码中的一些符号(比如>、<、(、))是乱码,建议修改下。

      • jk1420
      • 2015/06/09 6:02下午

      其实084完全可以使用else if, 传进来的key应该是一个事件一个实例的

    您必须 登陆 后才能发表评论

    return top

    爱投彩票 3go| uo3| gcy| u4y| wmi| 4wk| mkc| ia2| mcy| e2g| iio| 2ck| ki3| yms| sg3| mss| o3y| aqw| 3gc| sie| sa1| mcq| k2g| ioc| 2ky| kq2| mck| c2y| gco| 2qc| yy2| csq| gge| y1a| sao| 1ye| ki1| mky| q1k| usw| 1ms| sa2| 2ec| ou0| gom| cky| k0o| iiw| 0oc| cc0| aam| a1u| uay| 1qm| go1| qey| a9w| mec| 9ki| 9aq|