首页  

Kafka 时间轮 TimingWheel     所属分类 kafka 浏览量 46
时间轮是一个环形的队列(底层一般基于数组实现),队列中的每一个元素(时间格)都可以存放一个定时任务列表。
时间轮中的每个时间格代表了时间轮的基本时间跨度或者说时间精度
一个有 12 个时间格的时间轮,转完一圈需要 12 s ,
新建一个 3s 后执行的定时任务,将定时任务放在下标为 3 的时间格中即可,
创建一个 13s 后执行的定时任务,引入一个叫做 圈数/轮数 的概念,
这个任务还是放在下标为 3 的时间格中, 不过它的圈数为 2 ,
除了增加圈数这种方法,还有一种 多层次时间轮 (类似手表), Kafka 采用的就是这种方案。

相比于常用的DelayQueue的时间复杂度O(logN),TimingWheel的数据结构在插入任务时只要O(1),获取到达任务的时间复杂度也远低于O(logN)。

kafka 默认实现,tickMs=1Ms,wheelSize=20 
第一层时间轮能表示时间范围是0~20Ms
第二层时间轮每一个槽所能表示的时间是第一层时间轮所能表示的时间范围,能表示的时间范围是0~400Ms


在kafka中,有许多请求并不是立即返回,而且处理完一些异步操作或者等待某些条件达成后才返回,
这些请求一般都会带有timeout参数,表示如果timeout时间后服务端还不满足返回的条件,
就判定此次请求为超时,这时候kafka同样要返回超时的响应给客户端,这样客户端才知道此次请求超时了。
比如ack=-1的producer请求,需要等待所有的isr备份完成了才可以返回给客户端,或者到达timeout时间了返回超时响应给客户端。

上面的场景,可以用延迟任务来实现。也就是定义一个任务,在timeout时间后执行,
执行的内容一般就是先检查返回条件是否满足,满足的话就返回客户端需要的响应,如果不满足,就发送超时响应给客户端。

对于延迟操作,java自带的实现有Timer和ScheduledThreadPoolExecutor。
这两个的底层数据结构都是基于一个延迟队列,在准备执行一个延迟任务时,将其插入到延迟队列中。
这些延迟队列其实就是一个用最小堆实现的优先级队列,因此,插入一个任务的时间复杂度是O(logN),取出一个任务执行后调整堆的时间也是O(logN)。

如果要执行的延迟任务不多,O(logN)的速度已经够快了。
为了追求更快的速度,使用 Timing Wheel 数据结构, 任务的插入时间复杂度为O(1)


private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {

  private[this] val interval = tickMs * wheelSize
  private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }

  private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs
}


tickMs:表示一个槽所代表的时间范围,kafka的默认值的1ms
wheelSize:表示该时间轮有多少个槽,kafka的默认值是20
startMs:表示该时间轮的开始时间
taskCounter:表示该时间轮的任务总数
queue:TimerTaskList的延迟队列
每个槽都有它一个对应的TimerTaskList,TimerTaskList是一个双向链表,有一个expireTime的值,这些TimerTaskList都被加到这个延迟队列中,expireTime最小的槽会排在队列的最前面。

interval:时间轮所能表示的时间跨度,也就是tickMs*wheelSize
buckets:表示TimerTaskList的数组,即各个槽。
currentTime:表示当前时间,也就是时间轮指针指向的时



运行原理 新增一个延迟任务时,通过buckets[expiration / tickMs % wheelSize]先计算出它应该属于哪个槽。 比如延迟任务的delayMs=2ms,当前时间currentTime是0ms,则expiration=delayMs+startMs=2ms,它应该落于2号槽。 并把任务封装成TimerTaskEntry然后加入到TimerTaskList链表中。 之后,kafka会启动一个线程,去推动时间轮的指针转动。 其实现原理其实就是通过queue.poll()取出放在最前面的槽的TimerTaskList。 由于queue是一个延迟队列,如果队列中的expireTime没有到达,该操作会阻塞住,直到expireTime到达。 如果通过queue.poll()取到了TimerTaskList,说明该槽里面的任务时间都已经到达。 这时候就可以遍历该TimerTaskList中的任务,然后执行对应的操作了。 时间溢出处理 在kafka的默认实现中,tickMs=1Ms,wheelSize=20,这就表示该时间轮所能表示的延迟时间范围是0~20Ms, 如果延迟时间超过20Ms , 使用层级时间轮。 第一层的时间轮所能表示时间范围是0~20Ms之间,假设现在出现一个任务的延迟时间是200Ms,那么kafka会再创建一层时间轮,称之为第二层时间轮。 第二层时间轮 overflowWheel = new TimingWheel( tickMs = interval, wheelSize = wheelSize, startMs = currentTime, taskCounter = taskCounter, queue ) 第二层时间轮每一个槽所能表示的时间是第一层时间轮所能表示的时间范围,也就是20Ms 槽的数量还是一样,其他的属性也是继承自第一层时间轮,这时第二层时间轮所能表示的时间范围就是0~400Ms了 如果第二层时间轮的时间范围还容纳不了新的延迟任务,就会创建第三层、第四层

上一篇     下一篇
Quartz简介

quartz countJob 实例

quartz 配置文件 quartz.properties

mysql 悲观锁

quartz.properties配置文件详解

java 技术学习面试指南