H. 保证顺序的事件处理器
H. 保证顺序的事件处理器
更新日期:2021-01-10
1. 需求
想创建下面这样的一个事件处理器:
- 有几个其他线程会不定期的生成一些事件
- 这些事件将被发送给一个事件处理器
- 希望事件在处理的时候能严格按照先后顺序
这个问题其实是一个生产者-消费者模式,只不过消费者限定为一个。
2. 实现
由于消费者只有一个,所以消费者可以采用单线程来模拟。
在Java中,可以使用Executors.newSingleThreadExecutor()创建一个单线程的处理器。然后每接受一个事件就给处理器提交一个任务。
这些任务被自动积攒在处理器内部,由于只有一个线程来处理这些任务,所以必然是按照顺序一个一个处理掉。
按照这个思路,我写了如下的演示代码。代码中同时包含了测试性能、统计时间的功能。
保证处理顺序的事件处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114 | import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class App2 {
public static void main(String[] args) {
App2 app = new App2();
app.start();
}
// 程序开始
private void start() {
// 创建一个事件处理器
MyEventHandle myEventHandle = new MyEventHandle();
// 开启4个线程同时发送事件通知
new Thread(new SendEventTask(1, myEventHandle)).start();
new Thread(new SendEventTask(2, myEventHandle)).start();
new Thread(new SendEventTask(3, myEventHandle)).start();
new Thread(new SendEventTask(4, myEventHandle)).start();
}
// 投放事件线程
public class SendEventTask implements Runnable {
// 线程ID
int id;
// 事件处理器
MyEventHandle handle;
public SendEventTask(int id, MyEventHandle handle) {
this.id = id;
this.handle = handle;
}
@Override
public void run() {
// 不断的投入新的事件
int max = 100000;
long startTime = System.nanoTime();
for (int i = 0; i < max; i++) {
MyEvent event = new MyEvent(id + "-" + i);
handle.sendEvent(event);
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long time = (System.nanoTime() - startTime) / 1000 / 1000 / 1000;
System.out.println("线程" + id + "共耗时:" + time + "秒");
}
}
// 事件
public class MyEvent {
// 事件ID
private String id;
// 事件数据
private Object data;
// 构造方法
public MyEvent(String id) {
this.id = id;
// 随便创建一个大小不均的数据块来模拟实际运行过程中的各种不同数据
int size = (int)(Math.random() * 1024) + 1;
this.data = new int[size];
}
}
// 我的事件处理器
public class MyEventHandle {
// 事件处理线程
Executor executor = Executors.newSingleThreadExecutor();
// 时间
long lastTime = System.nanoTime();
// 发送一个事件
public void sendEvent(MyEvent event) {
// 创建一个任务,将提交给处理线程
executor.execute(new MyTask(event));
}
// 事件处理函数
public void handleEvent(MyEvent event) {
// 输出处理间隔时间
long time = System.nanoTime() - lastTime;
lastTime = System.nanoTime();
System.out.println("处理间隔 : " + (double)time / 1000000);
// 输出ID和数据大小
String msg = String.format("ID(%s) : 数据大小为%d", event.id, ((int[])event.data).length);
System.out.println(msg);
}
// 事件处理任务
public class MyTask implements Runnable {
// 事件
private MyEvent event;
public MyTask(MyEvent event) {
this.event = event;
}
@Override
public void run() {
// 调用事件处理函数
handleEvent(event);
}
}
}
}
|
程序的运行结果是,四个线程都在同一秒内结束发送任务。事件处理器处理每个任务的间隔在平均2ms左右。最高不超过5ms。从性能上来说,完全能够满足使用需求。
最后几行输出结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 | ... ...
ID(1-99990) : 数据大小为50
处理间隔 : 1.9529
ID(1-99991) : 数据大小为873
处理间隔 : 1.996001
ID(1-99992) : 数据大小为473
处理间隔 : 1.9889
ID(1-99993) : 数据大小为1012
处理间隔 : 2.0208
ID(1-99994) : 数据大小为465
处理间隔 : 2.021
ID(1-99995) : 数据大小为313
处理间隔 : 1.1019
ID(1-99996) : 数据大小为95
处理间隔 : 1.840299
ID(1-99997) : 数据大小为307
处理间隔 : 3.004901
ID(1-99998) : 数据大小为167
处理间隔 : 2.0283
ID(1-99999) : 数据大小为400
线程1共耗时:195秒
|