首页  

Netty EventLoop     所属分类 netty 浏览量 54
Netty EventLoop 用于处理I/O事件
它是一个单线程执行器,同时维护了一个Selector,用于处理Channel上的IO事件。
EventLoop继承自EventExecutor和ScheduledExecutorService,因此可以直接将任务交给EventLoop执行

每个 EventLoop 都有一个任务队列(TaskQueue),用于存放需要延迟执行或异步执行的任务。
这些任务可以是定时任务、I/O操作、用户自定义的任务等


任务队列的主要用途包括:
用户程序自定义的普通任务:这些任务通常是由用户提交的,用于处理一些非I/O相关的业务逻辑。
用户自定义定时任务:这些任务用于在指定的时间点执行某些操作,例如定时发送心跳包或定期清理资源。
非当前Reactor线程调用Channel的各种方法:在某些情况下,可能需要在非当前的Reactor线程中调用Channel的方法,这时可以通过任务队列来异步执行这些操作

网络事件、普通事件、延时事件

Netty的任务队列是一个基于优先级的任务队列,支持两种类型的任务:用户任务(User Task)和IO任务(IO Task)。
用户任务是由用户提交的,而IO任务是由Netty内部生成的,用于处理IO事件。
TaskQueue通过优先级队列实现,确保了高优先级的任务能够优先得到处理。

为了避免阻塞当前I/O线程,建议使用专门的EventLoopExecutor来处理长时间运行的任务或需要阻塞的操作 ,
每个EventLoop都有自己的任务队列,独立于其他EventLoop,这样可以确保任务的执行不会相互干扰。

Execute task in EventLoop

while (!terminated) {
    // 1 阻塞直到有事件可以运行
    List< Runnable> readyEvents = blockUntilEventsReady();
    for (Runnable ev: readyEvents) {
        // 2 循环所有事件,并运行他们
        ev.run(); 
    }
}


select() 阻塞方法,监听网络事件 processSelectedKeys() 分发处理网络事件 this.runAllTasks() 执行队列里面的任务
public final class NioEventLoop extends SingleThreadEventLoop { private Selector selector; // 多路选择器 private final Queue< Runnable> taskQueue; // 普通任务队列 PriorityQueue < ScheduledFutureTask < ? > > scheduledTaskQueue; // 定时任务队列 private volatile int ioRatio = 50; // 执行网络事件和任务队列的时间比例 protected void run() { int selectCnt = 0; for (;;) { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); // 执行网络事件 } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);//执行普通事件+定时事件 }}}} protected boolean runAllTasks(long timeoutNanos) { // 从定时任务队列里取出任务放入普通任务队列 fetchFromScheduledTaskQueue(); // 从任务队列中取出待执行任务 Runnable task = pollTask(); // 算出这里可以执行任务的时间 final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0; long runTasks = 0; long lastExecutionTime; for (;;) { // 开始执行任务,这里捕捉了任务异常 safeExecute(task); runTasks ++; // 每执行0x3F=64个任务开始检测超时 if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } task = pollTask(); // 取下一个任务,没有任务,记录时间后退出 if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } this.lastExecutionTime = lastExecutionTime; return true; }
无锁化 每一个channel绑定一个线程,对channel的操作都放在这个线程中执行 多个线程写入channel 判断当前线程是否为事件循环线程, 如果不是,将操作封装为 runnable 或 callable 对象 ,放入channel绑定的事件循环的任务队列中执行

上一篇     下一篇
netty ByteBuf 种类

netty ByteToMessageDecoder

netty解码器实例

netty VS mina

热锅冷油,炒菜不粘锅的科学原理

netty Event Handler 和 Pipeline