概述
java延迟队列提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。没有过期元素的话,使用poll()方法会返回null值,超时判定是通过getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于0来判断。延时队列不能存放空元素。
使用DelayQueue需要实现Delayed接口并实现getDelay(TimeUnit unit)方法和compareTo(Delayed o)方法。
消息类实体
package com.smartirhd.basics.controller;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* 延时消息
* @author LiMingQiang
*/
public class DelayedWeiboMessage implements Delayed {
/**
* 触发时间
*/
private long executeTime;
/**
* 追踪ID
*/
private String traceId;
/**
* URL
*/
private String url;
/**
*
* @param traceId
* @param url
* @param delayTime 延迟时间(秒)
*/
public DelayedWeiboMessage(String traceId, String url, long delayTime) {
//执行时间 = 当前时间 + delayTime(延迟时间)
this.executeTime = System.currentTimeMillis() + (delayTime * 1000);
this.traceId = traceId;
this.url = url;
}
public long getExcuteTime() {
return executeTime;
}
public void setExcuteTime(long excuteTime) {
this.executeTime = excuteTime;
}
public String getTraceId() {
return traceId;
}
public void setTraceId(String traceId) {
this.traceId = traceId;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
/**
* 返回剩余延迟时间
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
//判断excuteTime是否大于当前系统时间,并将结果转换成MILLISECONDS
long diffTime = this.executeTime - System.currentTimeMillis();
return unit.convert(diffTime, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
//对队列中的消息进行排序
DelayedWeiboMessage dwm = (DelayedWeiboMessage) o;
return (int) (this.executeTime - dwm.getExcuteTime());
}
}
生产和消费
//定义队列
public final static DelayQueue<DelayedWeiboMessage> WEIBO_QUEUE = new DelayQueue<>();
//消息生产,直接put消息
WEIBO_QUEUE.put(new DelayedWeiboMessage(traceId, url, 60));
//消息消费,持续
@PostConstruct
public void init(){
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new CustomizableThreadFactory("weibo-click-Thread-pool-"),
new ThreadPoolExecutor.DiscardPolicy());
poolExecutor.execute(() -> {
//回传失败消息
while (true){
try {
DelayedWeiboMessage dwm = WEIBO_QUEUE.take();
log.info("traceId:{},请求重试!!!", dwm.getTraceId());
//业务逻辑
...
...
...
Thread.sleep(50);
} catch (InterruptedException e) {
log.warn("回传激活消息消费失败,不再重试!!!");
}
}
});
}