简单说说Kafka中的时间轮算法 转载
一、前言
由于工作的需要,得实现一个用于控制事件超时抛弃的时间轮,由于这是一个相对独立的接口,就总结分享一下。
首先看下需求,此时间轮需要具备下面几个功能:
1)能添加事件,同时附上其超时时间;
2)如果事件正常执行结束,可以显示将其从时间轮上剔除掉,而不需要等时间轮自动移除;
3)如果事件到了设定的超时时间还没执行完,则时间轮需将其剔除掉,并发送一个超时的消息给系统。
基于这样的需求,下面就进行相应的设计和实现。
二、时间轮的设计
基于前面的需求,可以抽象出两个实体来:时钟和槽,其中时钟去负责指针的走动、获取等,而槽用来存储落在各个时间点的事件。同时还需要有三个行为:添加,正常执行的删除,超时的删除和发送消息。有了这样的抽象,首先设计了两个类Clocker和Slot,分别代表时钟和槽;对于几种行为,则需要放到一个主控制类中,设计为TimeWheel,将接口暴露给外部调用。这样系统的静态结构就出来了。
注意到,当事件超时后,需要发送消息出去,以供事件执行相关的处理,这里采用一种面向接口的编程方式。我们先定义一个接口Expiration,里面只有一个方法expired方法。具体的事件需要实现这个接口,这样在我们的时间轮中,只要超时,直接以传入的事件来调用expired方法即能起到发送消息的目的!
零、时间轮定义
简单说说时间轮吧,它是一个高效的延时队列,或者说定时器。实际上现在网上对于时间轮算法的解释很多,定义也很全,这里引用一下朱小厮博客里出现的定义:
参考下图,Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList是一个环形中间件的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务TimerTask。
如果你理解了上面的定义,那么就不必往下看了。但如果你第一次看到和我一样懵比,并且有不少疑问,那么这篇博文将带你进一步了解时间轮,甚至理解时间轮算法。
如果有兴趣,可以去看看其他的定时器 你真的了解延时队列吗。博主认为,时间轮定时器最大的优点:
是任务的添加与移除,都是O(1)级的复杂度;
不会占用大量的资源;
只需要有一个线程去推进时间轮就可以工作了。
我们将对时间轮做层层推进的解析:
一、为什么使用环形队列
假设我们现在有一个很大的数组,专门用于存放延时任务。它的精度达到了毫秒级!那么我们的延迟任务实际上需要将定时的那个时间简单转换为毫秒即可,然后将定时任务存入其中:
比如说当前的时间是2018/10/24 19:43:45,那么就将任务存入Task[1540381425000],value则是定时任务的内容。
private Task[很长] tasks;
public List getTaskList(long timestamp) {
return task.get(timestamp)
}
// 假装这里真的能一毫秒一个循环
public void run(){
while (true){
getTaskList(System.currentTimeMillis()).后台执行()
Thread.sleep(1);
}
}
假如这个数组长度达到了亿亿级,我们确实可以这么干。 那如果将精度缩减到秒级呢?我们也需要一个百亿级长度的数组。
先不说内存够不够,显然你的定时器要这么大的内存显然很浪费。
当然如果我们自己写一个map,并保证它不存在hash冲突问题,那也是完全可行的。(我不确定我的想法是否正确,如果错误,请指出)
/* 一个精度为秒级的延时任务管理类 */
private Map taskMap;
public List getTaskList(long timestamp) {
return taskMap.get(timestamp - timestamp % 1000)
}
// 新增一个任务
public void addTask(long timestamp, Task task) {
List taskList = getTaskList(timestamp - timestamp % 1000);
if (taskList == null){
taskList = new ArrayList();
}
taskList.add(task);
}
// 假装这里真的能一秒一个循环
public void run(){
while (true){
getTaskList(System.currentTimeMillis()).后台执行()
Thread.sleep(1000);
}
}
其实时间轮就是一个不存在hash冲突的数据结构
抛开其他疑问,我们看看手腕上的手表(如果没有去找个钟表,或者想象一个),是不是无论当前是什么时间,总能用我们的表盘去表示它(忽略精度)
就拿秒表来说,它总是落在 0 - 59 秒,每走一圈,又会重新开始。
用伪代码模拟一下我们这个秒表:
private Bucket[60] buckets;// 表示60秒
public void addTask(long timestamp, Task task) {
Bucket bucket = buckets[timestamp / 1000 % 60];
bucket.add(task);
}
public Bucket getBucket(long timestamp) {
return buckets[timestamp / 1000 % 60];
}
// 假装这里真的能一秒一个循环
public void run(){
while (true){
getBucket(System.currentTimeMillis()).后台执行()
Thread.sleep(1000);
}
}
这样,我们的时间总能落在0 - 59任意一个bucket上,就如同我们的秒钟总是落在0 - 59刻度上一样,这便是时间轮Epxress的环形队列。
二、表示的时间有限
但是细心的小伙伴也会发现这么一个问题:如果只能表示60秒内的定时任务应该怎么存储与取出,那是不是太有局限性了?如果想要加入一小时后的延迟任务,该怎么办?
其实还是可以看一看钟表,对于只有三个指针的表(一般的表)来说,最大能表示12个小时,超过了12小时这个范围,时间就会产生歧义。如果我们加多几个指针呢?比如说我们有秒针,分针,时针,上下午针,天针,月针,年针...... 那不就能表示很长很长的一段时间了?而且,它并不需要占用很大的内存。
比如说秒针我们可以用一个长度为60的数组来表示,分针也同样可以用一个长度为60的数组来表示,时针可以用一个长度为24的数组来表示。那么表示一天内的所有时间,只需要三个数组即可。
动手来做吧,我们将这个数据结构称作时间轮,tickMs表示一个刻度,比如说上面说的一秒。wheelSize表示一圈有多少个刻度,即上面说的60。interval表示一圈能表示多少时间,即 tickMs * wheelSize = 60秒。
overflowWheel表示上一层的时间轮,比如说,对于秒钟来说,overflowWheel就表示分钟,以此类推。
public class TimeWheel {
/** 一个时间槽的时间 */
private long tickMs;
/** 时间轮大小 */
private int wheelSize;
/** 时间跨度 */
private long interval;
/** 槽 */
private Bucket[] buckets;
/** 时间轮指针 */
private long currentTimestamp;
/** 上层时间轮 */
private volatile TimeWheel overflowWheel;
public TimeWheel(long tickMs, int wheelSize, long currentTimestamp) {
this.currentTimestamp = currentTimestamp;
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.interval = tickMs * wheelSize;
this.buckets = new Bucket[wheelSize];
this.currentTimestamp = currentTimestamp - (currentTimestamp % tickMs);
for (int i = 0; i < wheelSize; i++) {
buckets[i] = new Bucket();
}
}
}
将任务添加到时间轮中十分简单,对于每个时间轮来说,比如说秒级时间轮,和分级时间轮,都有它自己的过期槽。也就是delayMs < tickMs的时候。
添加延时任务的时候一共就这几种情况:
####一、时间到期
1)比如说有一个任务要在 16:29:07 执行,从秒级时间轮中来看,当我们的当前时间走到16:29:06的时候,则表示这个任务已经过期了。因为它的delayMs = 1000ms,小于了我们的秒级时间轮的tickMs(1000ms)。
比如说有一个任务要在 16:41:25 执行,从分级时间轮中来看,当我们的当前时间走到 16:41的时候(分级时间轮没有秒针!它的最小精度是分钟(一定要理解这一点)),则表示这个任务已经到期,因为它的delayMs = 25000ms,小于了我们的分级时间轮的tickMs(60000ms)。
二、时间未到期,且delayMs小于interval。
对于秒级时间轮来说,就是延迟时间小于60s,那么肯定能找到一个秒钟槽扔进去。
三、时间未到期,且delayMs大于interval。
对于妙级时间轮来说,就是延迟时间大于等于60s,这时候就需要借助上层时间轮的力量了,很简单的代码实现,就是拿到上层时间轮,然后类似递归一样,把它扔进去。
比如说一个有一个延时为一年后的定时任务,就会在这个递归中不断创建更上层的时间轮,直到找到满足delayMs小于interval的那个时间轮。
这里为了不把代码写的那么复杂,我们每一层时间轮的刻度都一样,也就是秒级时间轮表示60秒,上面则表示60分钟,再上面则表示60小时,再上层则表示60个60小时,再上层则表示60个60个60小时 = 216000小时。
也就是如果将最底层时间轮的tickMs(精度)设置为1000ms。wheelSize设置为60。那么只需要5层时间轮,可表示的时间跨度已经长达24年(216000小时)。
/**
* 添加任务到某个时间轮
*/
public boolean addTask(TimedTask timedTask) {
long expireTimestamp = timedTask.getExpireTimestamp();
long delayMs = expireTimestamp - currentTimestamp;
if (delayMs < tickMs) {// 到期了
return false;
} else {
// 扔进当前时间轮的某个槽中,只有时间【大于某个槽】,才会放进去
if (delayMs < interval) {
int bucketIndex = (int) (((delayMs + currentTimestamp) / tickMs) % wheelSize);
Bucket bucket = buckets[bucketIndex];
bucket.addTask(timedTask);
} else {
// 当maybeInThisBucket大于等于wheelSize时,需要将它扔到上一层的时间轮
TimeWheel timeWheel = getOverflowWheel();
timeWheel.addTask(timedTask);
}
}
return true;
}
/**
* 获取或创建一个上层时间轮
*/
private TimeWheel getOverflowWheel() {
if (overflowWheel == null) {
synchronized (this) {
if (overflowWheel == null) {
overflowWheel = new TimeWheel(interval, wheelSize, currentTimestamp, delayQueue);
}
}
}
return overflowWheel;
}
当然我们的时间轮还需要一个指针的推进机制,总不能让时间永远停留在当前吧?推进的时候,同时类似递归,去推进一下上一层的时间轮。
注意:要强调一点的是,我们这个时间轮更像是电子表,它不存在时间的中间状态,也就是精度这个概念一定要理解好。比如说,对于秒级时间轮来说,它的精度只能保证到1秒,小于1秒的,都会当成是已到期
对于分级时间轮来说,它的精度只能保证到1分,小于1分的,都会当成是已到期
/**
* 尝试推进一下指针
*/
public void advanceClock(long timestamp) {
if (timestamp >= currentTimestamp + tickMs) {
currentTimestamp = timestamp - (timestamp % tickMs);
if (overflowWheel != null) {
this.getOverflowWheel()
.advanceClock(timestamp);
}
}
}
三、对于高层时间轮来说,精度越来越不准,会不会有影响?
上面说到,分级时间轮,精度只有分钟级,总不能延迟1秒的定时任务和延迟59秒的定时任务同时执行吧?
有这个疑问的同学很好!实际上很好解决,只需再入时间轮即可。比如说,对于分钟级时间轮来说,delayMs为1秒和delayMs为59秒的都已经过期,我们将其取出,再扔进底层的时间轮不就可以了?
1秒的会被扔到秒级时间轮的下一个执行槽中,而59秒的会被扔到秒级时间轮的后59个时间槽中。
细心的同学会发现,我们的添加H5移动开发任务方法,返回的是一个bool
public boolean addTask(TimedTask timedTask)
再倒回去好好看看,添加到最底层时间轮失败的(我们只能直接操作最底层的时间轮,不能直接操作上层的时间轮),是不是会直接返回flase?对于再入失败的任务,我们直接执行即可。
/**
* 将任务添加到时间轮
*/
public void addOrSubmitTask(TimedTask timedTask) {
if (!timeWheel.addTask(timedTask)) {
taskExecutor.submit(timedTask.getTask());
}
}
四、如何知道一个任务已经过期?
记得我们将任务存储在槽中嘛?比如说秒级时间轮中,有60个槽,那么一共有60个槽。如果时间轮共有两层,也仅仅只有120个槽。我们只需将槽扔进一个delayedQueue之中即可。
我们轮询地从delayedQueue取出已经过期的槽即可。(前面的所有代码,为了简单说明,并没有引入这个DelayQueue的概念,所以不用去上面翻了,并没有。博主觉得...已经看到这里了,应该很明白这个DelayQueue的意义了。)
其实简单来说,实际上定时任务单单使用DelayQueue来实现,也是可以的,但是一旦任务的数量多了起来,达到了百万级,千万级,针对这个delayQueue的增删,将非常的慢。
对于Clocker类,由于它要控制着时间的不停走动,得是一个单独的线程去完成。其中时间的走动,最好不用Thread.sleep去模拟;可以用一个空的阻塞队列,然后每次调用poll方法,设置间隔时间为超时时间,这样的效果会更好。对于一个新事件,带着超时时间来,需要找到其应在的指针位置,通常的做法就是按超时时间除以预定义好的间隔时间,获取一个偏移量,加上当前位置即可。
对于主类TimeWheel,由于在事件正常结束的时候,可以让事件主动从时间轮中将自己删除掉,因此其add方法就需要返回一个事件所在slot的位置;这样在remove的时候,连带位置信息,就可以方便找到坐在的slot,从中删除该事件。
当超时时间到时,需要将对应的slot中所有元素清除掉,这个工作可以在Slot类中完成,即将对应槽中所有元素取出来放到临时变量中,将当前槽清空,这样就能保证槽中不会有事件积压。对于取出的元素,调用对应的expired方法即可。
基于这样的设计,已经能够满足系统的需要了,并且运行的较为良好。
** 一、面向槽的delayQueue**
而对于时间轮来说,它只需要往delayQueue里面扔各种槽即可,比如我们的定时任务长短不一,最长的跨度到了24年,这个delayQueue也仅仅只有300个元素。
** 二、处理过期的槽**
而这个槽到期后,也就是被我们从delayQueue中poll出来后,我们只需要将槽中的所有任务循环一次,重新加到新的槽中(添加失败则直接执行)即可。
/**
* 推进一下时间轮的指针,并且将delayQueue中的任务取出来再重新扔进去
*/
public void advanceClock(long timeout) {
try {
Bucket bucket = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);
if (bucket != null) {
timeWheel.advanceClock(bucket.getExpire());
bucket.flush(this::addTask);
}
} catch (Exception e) {
e.printStackTrace();
}
}
五、优化
对于上面的设计,细想会有一个可改造的点,就是对于超时元素执行expired方法是在TimeWheel线程里执行的,这样会有问题:
1)不确定expired方法执行所需要花费的时间,这样就有可能影响主线程;
2)如果expired方法出现异常,主线程可能会受到影响。
基于这样的考虑,我们可以把这些事件放到一个阻塞队列里,然后另起一个线程,专门去队列中取元素,执行expired方法。这样就很好的将expired操作和主线程隔离了开来。对于这样的设计,也就是典型的生产者-消费者模型,主线程TimeWheel负责生产数据,设计一个Release线程负责消费数据,仓库用阻塞队列承担,这样就较好的解决了这个问题,起到了优化的作用。
完整的时间轮GitHub,其实就是半抄半自己撸的Kafka时间轮简化版 Timer#main 中模拟了六百万个简单的延时任务,执行的效率很高 ~
原文出处:Anur
- 上一篇: 快速了解 mpvue 开发小程序
- 下一篇: SpringMVC在SpringBoot中的意义