Skip to content

DelayQueue延时队列

Java延迟队列提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。如果没有过期元素,使用poll()方法会返回null值,超时判定是通过getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于0来判断。延时队列不能存放空元素。
使用DelayQueue需要实现Delayed接口并实现getDelay(TimeUnit unit)方法和compareTo(Delayed o)方法。

注意

这是一个在内存使用的队列,如果程序重启、宕机等会造成消息丢失,所以还需综合衡量后方可使用

消息类

java
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayedWeiboMessage implements Delayed {
 
    /** 触发时间 */
    private long executeTime;
    /** 追踪ID */
    private String traceId;
    /** URL */
    private String url;

    public long getExcuteTime() { return executeTime; }
    public void setExcuteTime(long excuteTime) { this.executeTime = excuteTime; }
    public String getTraceId() { return traceId; }
    public void setTraceId(String traceId) { this.traceId = traceId; }
    public String getUrl() { return url; }
    public void setUrl(String url) { this.url = url; }
 
    /**
     * 构造函数
     * @param traceId 追踪ID,用于记录日志,排查错误
     * @param url 业务处理需要用到的数据
     * @param delayTime 延迟时间(秒)
     */
    public DelayedWeiboMessage(String traceId, String url, long delayTime) {
        //执行时间 = 当前时间 + delayTime(延迟时间)
        this.executeTime = System.currentTimeMillis() + (delayTime * 1000);
        this.traceId = traceId;
        this.url = url;
    }
 
    /**
     * 返回剩余延迟时间
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        //判断excuteTime是否大于当前系统时间,并将结果转换成MILLISECONDS
        long diffTime = this.executeTime - System.currentTimeMillis();
        return unit.convert(diffTime, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
    	//对队列中的消息进行排序
        DelayedWeiboMessage dwm = (DelayedWeiboMessage) o;
        return (int) (this.executeTime - dwm.getExcuteTime());
    }
}

生产消息

java
//定义一个全局访问的队列
public final static DelayQueue<DelayedWeiboMessage> WEIBO_QUEUE = new DelayQueue<>();

//当需要产生一条消息时,在业务代码中添加消息到队列中。60是延迟的时间,例子中的时间单位是秒
WEIBO_QUEUE.put(new DelayedWeiboMessage(traceId, url, 60));

消费消息

java
//这里是定义一个只有 1个线程的线程池去消费消息,那么这个线程池就不需要shutDown了
public final static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(),
            new CustomizableThreadFactory("custom-Thread-pool-"),
            new ThreadPoolExecutor.DiscardPolicy());

static {
    //在程序启动时,这个消费处理就可以执行了
    poolExecutor.execute(() -> {
        //回传失败消息
        while (true){
            try {
                DelayedWeiboMessage dwm = WEIBO_QUEUE.take();
                log.info("traceId:{},请求重试!!!", dwm.getTraceId());
                //业务逻辑
                ...
                ...
                ...
                Thread.sleep(50);
            } catch (InterruptedException e) {
                log.warn("回传激活消息消费失败,不再重试!!!");
            }
        }
    });
}