DelayQueue延时队列
Java延迟队列提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。如果没有过期元素,使用poll()
方法会返回null
值,超时判定是通过getDelay(TimeUnit.NANOSECONDS)
方法的返回值小于等于0来判断。延时队列不能存放空元素。
使用DelayQueue
需要实现Delayed
接口并实现getDelay(TimeUnit unit)
方法和compareTo(Delayed o)
方法。
注意
这是一个在内存使用的队列,如果程序重启、宕机等会造成消息丢失,所以还需综合衡量后方可使用
消息类
java
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayedWeiboMessage implements Delayed {
/** 触发时间 */
private long executeTime;
/** 追踪ID */
private String traceId;
/** URL */
private String url;
public long getExcuteTime() { return executeTime; }
public void setExcuteTime(long excuteTime) { this.executeTime = excuteTime; }
public String getTraceId() { return traceId; }
public void setTraceId(String traceId) { this.traceId = traceId; }
public String getUrl() { return url; }
public void setUrl(String url) { this.url = url; }
/**
* 构造函数
* @param traceId 追踪ID,用于记录日志,排查错误
* @param url 业务处理需要用到的数据
* @param delayTime 延迟时间(秒)
*/
public DelayedWeiboMessage(String traceId, String url, long delayTime) {
//执行时间 = 当前时间 + delayTime(延迟时间)
this.executeTime = System.currentTimeMillis() + (delayTime * 1000);
this.traceId = traceId;
this.url = url;
}
/**
* 返回剩余延迟时间
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
//判断excuteTime是否大于当前系统时间,并将结果转换成MILLISECONDS
long diffTime = this.executeTime - System.currentTimeMillis();
return unit.convert(diffTime, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
//对队列中的消息进行排序
DelayedWeiboMessage dwm = (DelayedWeiboMessage) o;
return (int) (this.executeTime - dwm.getExcuteTime());
}
}
生产消息
java
//定义一个全局访问的队列
public final static DelayQueue<DelayedWeiboMessage> WEIBO_QUEUE = new DelayQueue<>();
//当需要产生一条消息时,在业务代码中添加消息到队列中。60是延迟的时间,例子中的时间单位是秒
WEIBO_QUEUE.put(new DelayedWeiboMessage(traceId, url, 60));
消费消息
java
//这里是定义一个只有 1个线程的线程池去消费消息,那么这个线程池就不需要shutDown了
public final static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new CustomizableThreadFactory("custom-Thread-pool-"),
new ThreadPoolExecutor.DiscardPolicy());
static {
//在程序启动时,这个消费处理就可以执行了
poolExecutor.execute(() -> {
//回传失败消息
while (true){
try {
DelayedWeiboMessage dwm = WEIBO_QUEUE.take();
log.info("traceId:{},请求重试!!!", dwm.getTraceId());
//业务逻辑
...
...
...
Thread.sleep(50);
} catch (InterruptedException e) {
log.warn("回传激活消息消费失败,不再重试!!!");
}
}
});
}