java NIO 的最佳实践( 二 )

方案三: 一直订阅OP_WRITE,socketChannel主动写
while (channel.isOpen()) { //这里与方案一有区别 可以直接阻塞 int ready = selector.select(); if (ready > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { ...缓冲区已写数据清理SelectionKey selectionKey = iterator.next(); iterator.remove(); SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); socketChannel.configureBlocking(false); if (selectionKey.isConnectable()) { if (socketChannel.isConnectionPending()) { socketChannel.finishConnect(); } //订阅读/写事件 socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } if (selectionKey.isReadable()) { ...读事件处理 } if (selectionKey.isWritable()) { //改为主动读取式 ByteBuffer byteBuffer = awaitGetWrite(writeBuffer, 30, 50); if (byteBuffer != null) { int write = channel.write(byteBuffer); writeBuffer.readerIndex(writeBuffer.readerIndex() + write); if (write != byteBuffer.limit()) { System.out.print("a"); } } } } } }/** * 等待获取写缓存 * @param byteBuf 缓冲区 * @param ms 缓冲时间 防止空转 * @param cap 阈值:超过则直接返回,没超过等待ms后判断是否超过阈值 * @return */ public ByteBuffer awaitGetWrite(ByteBuf byteBuf, long ms, int cap) { //缓冲大小 不要过大就行 自己调整 int socketCap = 1024 * 30; if (byteBuf.readableBytes() >= cap) {//>=cap直接返回 return ByteBuffer.allocate(byteBuf.readableBytes() > socketCap ? socketCap : byteBuf.readableBytes()); } else {//<cap时等待 CountDownLatch countDownLatch = new CountDownLatch(1); try { countDownLatch.await(ms, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } if (byteBuf.readableBytes() > 0) { return ByteBuffer.allocate(byteBuf.readableBytes() > socketCap ? socketCap : byteBuf.readableBytes()); } else { return null; } } }优点缺点方案1当网络拥堵时,不尝试写数据需要自己控制订阅/取消订阅的时机方案2不关心网络拥堵,只要有数据就尝试写,当网络拥堵时做大量无用功编写方便,无需关心OP_WRITE事件订阅时机方案3相比方案1 编码复杂度下降
综合上述个人觉得还是方案3比较好
channel.write()写数据问题
现象: 网络拥堵时,cpu占用超高
原因: 网络拥堵时, channel.write()一直写不进去,导致while()空转
采取上一问题方案3可以避免该问题
writeBuffer.flip(); while (writeBuffer.hasRemaining()) { channel.write(writeBuffer); } writeBuffer.clear();分析: 当网络拥堵时,channel.write()可能写入0数据,而这里采用死循环写入数据,假如一直写不进去就会导致空转
最佳实践:
while (writeBuffer.isReadable()) {//这里使用的是netty的ByteBuf ByteBuffer byteBuffer = writeBuffer.nioBuffer(); channel.write(byteBuffer); writeBuffer.readerIndex(writeBuffer.readerIndex() + byteBuffer.position()); int left = byteBuffer.limit() - byteBuffer.position(); if (left != 0) {//无法全部写入到socket缓冲区中,说明socket缓冲区已满,可能发生空转 break System.err.print("a"); //防止空转 依赖外层循环重新进入break; }} 结合OP_WRITE订阅时机问题,可以得知方案一的临时订阅OP_WRITE事件方式,能更好的防止channel.write(byteBuffer)空转
TCP断开判断
现象: 当TCP一方断开时,另一方cpu占用超高
原因: 当TCP一方断开时,一直会触发OP_READ,导致空转.
分析: 当TCP一方断开时,触发OP_READ,socketChannel.read(readBuffer)返回-1,表示对方连接已断开,自己也需要断开连接socketChannel.close(),否则会一直触发OP_READ,导致空转
while (true) { int ready = selector.select(); if (ready > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); if (selectionKey.isConnectable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); if (socketChannel.isConnectionPending()) { socketChannel.finishConnect(); } socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } else if (selectionKey.isReadable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); socketChannel.configureBlocking(false);//The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream int read = socketChannel.read(readBuffer); readBuffer.flip();//读到-1 没有处理 导致空转 if (read > 0) { System.out.print(new String(readBuffer.array(), 0, read)); } }... } } }复制代码最佳实践:
if (selectionKey.isReadable()) { ByteBuffer readBuffer = Server.SocketContext.get(socketChannel).getReadBuffer(); int read = socketChannel.read(readBuffer); readBuffer.flip(); if (read > 0) { System.out.print(new String(readBuffer.array(), 0, read)); } else if (read == -1) {//对面已断开 close System.out.println("断开..." + socketChannel.socket().getRemoteSocketAddress()); socketChannel.close(); } }


推荐阅读