首页  

Java NIO写事件处理技巧     所属分类 nio 浏览量 1014
Socket发送缓冲区未满时都是可写的 , 一般不注册写事件

while (buf.hasRemaining()) {
     int len = socketChannel.write(buf);
     if (len < 0) {
      throw new EOFException();
    }
}

大部分情况都没问题,但是高并发,并且在网络环境很差的情况下,发送缓冲区可能会满,导致无限循环


netty 通过writeSpinCount来控制尝试写的次数,如果最终还是无法写入,就注册写事件

Grizzly 与 netty处理类似
尝试多次失败后 ,在重新构造selector上注册写事件,并且通过select()来阻塞一定的时间来等待可写



netty protected void write0(AbstractNioChannel channel) { boolean open = true; boolean addOpWrite = false; boolean removeOpWrite = false; boolean iothread = isIoThread(channel); long writtenBytes = 0; final SocketSendBufferPool sendBufferPool = this.sendBufferPool; final WritableByteChannel ch = channel.channel; final Queue writeBuffer = channel.writeBufferQueue; final int writeSpinCount = channel.getConfig().getWriteSpinCount(); List causes = null; synchronized (channel.writeLock) { channel.inWriteNowLoop = true; for (;;) { MessageEvent evt = channel.currentWriteEvent; SendBuffer buf = null; ChannelFuture future = null; try { if (evt == null) { if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) { // 如果无数据可写,则需要删除可写事件的注册 removeOpWrite = true; channel.writeSuspended = false; break; } future = evt.getFuture(); channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage()); } else { future = evt.getFuture(); buf = channel.currentWriteBuffer; } long localWrittenBytes = 0; // 通过writeSpinCount来控制尝试写的次数,如果最终还是无法写入,就注册写事件 for (int i = writeSpinCount; i > 0; i --) { // 写数据 localWrittenBytes = buf.transferTo(ch); // 如果写入数据不等于零,表明写入成功,跳出循环 if (localWrittenBytes != 0) { writtenBytes += localWrittenBytes; break; } // 如果buf的数据都写完了,则跳出循环 if (buf.finished()) { break; } } if (buf.finished()) { // Successful write - proceed to the next message. buf.release(); channel.currentWriteEvent = null; channel.currentWriteBuffer = null; // Mark the event object for garbage collection. //noinspection UnusedAssignment evt = null; buf = null; future.setSuccess(); } else { // Not written fully - perhaps the kernel buffer is full. addOpWrite = true; channel.writeSuspended = true; if (writtenBytes > 0) { // Notify progress listeners if necessary. future.setProgress( localWrittenBytes, buf.writtenBytes(), buf.totalBytes()); } break; } } } channel.inWriteNowLoop = false; if (open) { if (addOpWrite) { // 注册写事件 setOpWrite(channel); } else if (removeOpWrite) { // 删除写事件 clearOpWrite(channel); } } } }
netty public static long flushChannel(SocketChannel socketChannel, ByteBuffer bb, long writeTimeout) throws IOException { SelectionKey key = null; Selector writeSelector = null; int attempts = 0; int bytesProduced = 0; try { while (bb.hasRemaining()) { int len = socketChannel.write(bb); // 类似Netty的spinCount attempts++; if (len < 0) { throw new EOFException(); } bytesProduced += len; if (len == 0) { if (writeSelector == null) { // 获取一个新的selector writeSelector = SelectorFactory.getSelector(); if (writeSelector == null) { // Continue using the main one continue; } } // 在新selector上注册写事件,而不是在主selector上注册 key = socketChannel.register(writeSelector, key.OP_WRITE); // 利用writeSelector.select()来阻塞当前线程,等待可写事件发生,总共等待可写事件的时长是3*writeTimeout if (writeSelector.select(writeTimeout) == 0) { if (attempts > 2) throw new IOException("Client disconnected"); } else { attempts--; } } else { attempts = 0; } } } return bytesProduced; }

上一篇     下一篇
NIO概述

第一个 NIO server 例子

Java NIO 注意点

java NIO buffer

Java NIO pipe

java NIO FileChannel