Netty EventLoop
所属分类 netty
浏览量 109
Netty EventLoop 用于处理I/O事件
它是一个单线程执行器,同时维护了一个Selector,用于处理Channel上的IO事件。
EventLoop继承自EventExecutor和ScheduledExecutorService,因此可以直接将任务交给EventLoop执行
每个 EventLoop 都有一个任务队列(TaskQueue),用于存放需要延迟执行或异步执行的任务。
这些任务可以是定时任务、I/O操作、用户自定义的任务等
任务队列的主要用途包括:
用户程序自定义的普通任务:这些任务通常是由用户提交的,用于处理一些非I/O相关的业务逻辑。
用户自定义定时任务:这些任务用于在指定的时间点执行某些操作,例如定时发送心跳包或定期清理资源。
非当前Reactor线程调用Channel的各种方法:在某些情况下,可能需要在非当前的Reactor线程中调用Channel的方法,这时可以通过任务队列来异步执行这些操作
网络事件、普通事件、延时事件
Netty的任务队列是一个基于优先级的任务队列,支持两种类型的任务:用户任务(User Task)和IO任务(IO Task)。
用户任务是由用户提交的,而IO任务是由Netty内部生成的,用于处理IO事件。
TaskQueue通过优先级队列实现,确保了高优先级的任务能够优先得到处理。
为了避免阻塞当前I/O线程,建议使用专门的EventLoopExecutor来处理长时间运行的任务或需要阻塞的操作 ,
每个EventLoop都有自己的任务队列,独立于其他EventLoop,这样可以确保任务的执行不会相互干扰。
Execute task in EventLoop
while (!terminated) {
// 1 阻塞直到有事件可以运行
List< Runnable> readyEvents = blockUntilEventsReady();
for (Runnable ev: readyEvents) {
// 2 循环所有事件,并运行他们
ev.run();
}
}
select() 阻塞方法,监听网络事件
processSelectedKeys() 分发处理网络事件
this.runAllTasks() 执行队列里面的任务
public final class NioEventLoop extends SingleThreadEventLoop {
private Selector selector; // 多路选择器
private final Queue< Runnable> taskQueue; // 普通任务队列
PriorityQueue < ScheduledFutureTask < ? > > scheduledTaskQueue; // 定时任务队列
private volatile int ioRatio = 50; // 执行网络事件和任务队列的时间比例
protected void run() {
int selectCnt = 0;
for (;;) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys(); // 执行网络事件
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);//执行普通事件+定时事件
}}}}
protected boolean runAllTasks(long timeoutNanos) {
// 从定时任务队列里取出任务放入普通任务队列
fetchFromScheduledTaskQueue();
// 从任务队列中取出待执行任务
Runnable task = pollTask();
// 算出这里可以执行任务的时间
final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
// 开始执行任务,这里捕捉了任务异常
safeExecute(task);
runTasks ++;
// 每执行0x3F=64个任务开始检测超时
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask(); // 取下一个任务,没有任务,记录时间后退出
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
this.lastExecutionTime = lastExecutionTime;
return true;
}
无锁化 每一个channel绑定一个线程,对channel的操作都放在这个线程中执行
多个线程写入channel
判断当前线程是否为事件循环线程,
如果不是,将操作封装为 runnable 或 callable 对象 ,放入channel绑定的事件循环的任务队列中执行
上一篇
下一篇
netty ByteBuf 种类
netty ByteToMessageDecoder
netty解码器实例
netty VS mina
热锅冷油,炒菜不粘锅的科学原理
netty Event Handler 和 Pipeline