code:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99public class TickTocker {
private final static ConcurrentLinkedQueue<Ele> queue = new ConcurrentLinkedQueue<>();
private Consumer<Long> callback = null;
public final int period = 1000 * 5; //10s
public final long expirTime = 1000 * 12; //1min
public static TickTocker init(Consumer<Long> callback) {
TickTocker tickTocker = new TickTocker();
tickTocker.start(callback);
return tickTocker;
}
public void add(long key) {
queue.offer(new Ele(key));
}
private void start(Consumer<Long> callback) {
this.callback = callback;
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
public void run() {
doJob();
}
}, 1000, period);
}
private void doJob() {
//获取队列元素
Ele e = queue.peek();
//空队列什么都不执行
if (e == null) {
System.out.println(LocalDateTime.now() + " - Queue is empty.");
return;
}
//判断是否超时
if (isExpired(e.getCtime(), expirTime)) {
System.out.println(LocalDateTime.now() + " - KEY:" + e.getKey() + " is expired.");
queue.poll(); //从队列中移出第一个元素
callback.accept(e.getKey()); //执行回调函数
doJob();
} else {
// System.out.println(LocalDateTime.now() + " - Do nothing.");
}
}
private boolean isExpired(long ctime, long expirTime) {
// System.out.println("ctime: " + ctime + ", expirTime: " + expirTime + ", now: " + System.currentTimeMillis());
return ctime + expirTime < System.currentTimeMillis();
}
class Ele {
private long ctime;
private long key;
public Ele(long key) {
this.key = key;
this.ctime = System.currentTimeMillis();
}
public long getKey() {
return key;
}
public void setKey(long key) {
this.key = key;
}
public void setCtime(long ctime) {
this.ctime = ctime;
}
public long getCtime() {
return ctime;
}
}
public static void main(String[] args){
Random random = new Random();
System.out.println(LocalDateTime.now() + " - Program started.");
TickTocker tickTocker = TickTocker.init(key -> {
System.out.println("Do something for " + key);
});
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 100; i < 110; i++){
final int key = i;
exec.execute(()->{
try {
Thread.sleep(random.nextInt(20000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(LocalDateTime.now() + " - add KEY: " + key);
tickTocker.add(key + 0L);
});
}
exec.shutdown();
}
}