小太阳的博客

DelayQueue延时队列

概述

java延迟队列提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。没有过期元素的话,使用poll()方法会返回null值,超时判定是通过getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于0来判断。延时队列不能存放空元素。

使用DelayQueue需要实现Delayed接口并实现getDelay(TimeUnit unit)方法和compareTo(Delayed o)方法。

消息类实体

package com.smartirhd.basics.controller;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * 延时消息
 * @author LiMingQiang
 */
public class DelayedWeiboMessage implements Delayed {

    /**
     * 触发时间
     */
    private long executeTime;
    /**
     * 追踪ID
     */
    private String traceId;
    /**
     * URL
     */
    private String url;

    /**
     *
     * @param traceId
     * @param url
     * @param delayTime 延迟时间(秒)
     */
    public DelayedWeiboMessage(String traceId, String url, long delayTime) {
        //执行时间 = 当前时间 + delayTime(延迟时间)
        this.executeTime = System.currentTimeMillis() + (delayTime * 1000);
        this.traceId = traceId;
        this.url = url;
    }

    public long getExcuteTime() {
        return executeTime;
    }
    public void setExcuteTime(long excuteTime) {
        this.executeTime = excuteTime;
    }
    public String getTraceId() {
        return traceId;
    }
    public void setTraceId(String traceId) {
        this.traceId = traceId;
    }
    public String getUrl() {
        return url;
    }
    public void setUrl(String url) {
        this.url = url;
    }
    /**
     * 返回剩余延迟时间
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        //判断excuteTime是否大于当前系统时间,并将结果转换成MILLISECONDS
        long diffTime = this.executeTime - System.currentTimeMillis();
        return unit.convert(diffTime, TimeUnit.MILLISECONDS);
    }
    @Override
    public int compareTo(Delayed o) {
    		//对队列中的消息进行排序
        DelayedWeiboMessage dwm = (DelayedWeiboMessage) o;
        return (int) (this.executeTime - dwm.getExcuteTime());
    }
}

生产和消费

//定义队列
public final static DelayQueue<DelayedWeiboMessage> WEIBO_QUEUE = new DelayQueue<>();

//消息生产,直接put消息
WEIBO_QUEUE.put(new DelayedWeiboMessage(traceId, url, 60));

//消息消费,持续
@PostConstruct
public void init(){
    ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(),
            new CustomizableThreadFactory("weibo-click-Thread-pool-"),
            new ThreadPoolExecutor.DiscardPolicy());
    poolExecutor.execute(() -> {
        //回传失败消息
        while (true){
            try {
                DelayedWeiboMessage dwm = WEIBO_QUEUE.take();
                log.info("traceId:{},请求重试!!!", dwm.getTraceId());
                //业务逻辑
                ...
                ...
                ...
                Thread.sleep(50);
            } catch (InterruptedException e) {
                log.warn("回传激活消息消费失败,不再重试!!!");
            }
        }
    });
}

Copyright © 2023,版权所有 - 小太阳的博客 - 黑ICP备2023000004号