Java NIO写事件处理技巧
所属分类 nio
浏览量 965
Socket发送缓冲区未满时都是可写的 , 一般不注册写事件
while (buf.hasRemaining()) {
int len = socketChannel.write(buf);
if (len < 0) {
throw new EOFException();
}
}
大部分情况都没问题,但是高并发,并且在网络环境很差的情况下,发送缓冲区可能会满,导致无限循环
netty 通过writeSpinCount来控制尝试写的次数,如果最终还是无法写入,就注册写事件
Grizzly 与 netty处理类似
尝试多次失败后 ,在重新构造selector上注册写事件,并且通过select()来阻塞一定的时间来等待可写
netty
protected void write0(AbstractNioChannel> channel) {
boolean open = true;
boolean addOpWrite = false;
boolean removeOpWrite = false;
boolean iothread = isIoThread(channel);
long writtenBytes = 0;
final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
final WritableByteChannel ch = channel.channel;
final Queue writeBuffer = channel.writeBufferQueue;
final int writeSpinCount = channel.getConfig().getWriteSpinCount();
List causes = null;
synchronized (channel.writeLock) {
channel.inWriteNowLoop = true;
for (;;) {
MessageEvent evt = channel.currentWriteEvent;
SendBuffer buf = null;
ChannelFuture future = null;
try {
if (evt == null) {
if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
// 如果无数据可写,则需要删除可写事件的注册
removeOpWrite = true;
channel.writeSuspended = false;
break;
}
future = evt.getFuture();
channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
} else {
future = evt.getFuture();
buf = channel.currentWriteBuffer;
}
long localWrittenBytes = 0;
// 通过writeSpinCount来控制尝试写的次数,如果最终还是无法写入,就注册写事件
for (int i = writeSpinCount; i > 0; i --) {
// 写数据
localWrittenBytes = buf.transferTo(ch);
// 如果写入数据不等于零,表明写入成功,跳出循环
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
}
// 如果buf的数据都写完了,则跳出循环
if (buf.finished()) {
break;
}
}
if (buf.finished()) {
// Successful write - proceed to the next message.
buf.release();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
// Mark the event object for garbage collection.
//noinspection UnusedAssignment
evt = null;
buf = null;
future.setSuccess();
} else {
// Not written fully - perhaps the kernel buffer is full.
addOpWrite = true;
channel.writeSuspended = true;
if (writtenBytes > 0) {
// Notify progress listeners if necessary.
future.setProgress(
localWrittenBytes,
buf.writtenBytes(), buf.totalBytes());
}
break;
}
}
}
channel.inWriteNowLoop = false;
if (open) {
if (addOpWrite) {
// 注册写事件
setOpWrite(channel);
} else if (removeOpWrite) {
// 删除写事件
clearOpWrite(channel);
}
}
}
}
netty
public static long flushChannel(SocketChannel socketChannel, ByteBuffer bb, long writeTimeout)
throws IOException {
SelectionKey key = null;
Selector writeSelector = null;
int attempts = 0;
int bytesProduced = 0;
try {
while (bb.hasRemaining()) {
int len = socketChannel.write(bb);
// 类似Netty的spinCount
attempts++;
if (len < 0) {
throw new EOFException();
}
bytesProduced += len;
if (len == 0) {
if (writeSelector == null) {
// 获取一个新的selector
writeSelector = SelectorFactory.getSelector();
if (writeSelector == null) {
// Continue using the main one
continue;
}
}
// 在新selector上注册写事件,而不是在主selector上注册
key = socketChannel.register(writeSelector, key.OP_WRITE);
// 利用writeSelector.select()来阻塞当前线程,等待可写事件发生,总共等待可写事件的时长是3*writeTimeout
if (writeSelector.select(writeTimeout) == 0) {
if (attempts > 2)
throw new IOException("Client disconnected");
} else {
attempts--;
}
} else {
attempts = 0;
}
}
}
return bytesProduced;
}
上一篇
下一篇
NIO概述
第一个 NIO server 例子
Java NIO 注意点
java NIO buffer
Java NIO pipe
java NIO FileChannel