- 自定义线程池,便于调试和控制线程
- 自定义线程数、拒绝策略、关闭等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
1644j
public class CustomTaskThreadPool implements CustomThreadPool {
/**
* 核心大小
*/
private static int CORE_POOLSIZE = Runtime.getRuntime().availableProcessors() * 2;
/**
* 上限
*/
private static int MAX_POOLSIZE = CORE_POOLSIZE * 10;
//留存时间ms
private static long KEEP_ALIVETIME = 300000L;
private static long SHUTDOWN_WAIT_TIME = 300000L;
/**
* 时间单位
*/
private static TimeUnit UNIT = TimeUnit.MILLISECONDS;
/**
* 队列容量
*/
private static int CAPACITY = 20;
/**
* 队列
*/
private static BlockingQueue<Runnable> WORK_QUEUE;
/**
* 线程工厂
*/
private static ThreadFactory THREAD_FACTORY;
/**
* 默认拒绝策略
*/
private static RejectedExecutionHandler REJECTED_EXECUTION_HANDLER;
private static ThreadPoolExecutor executor = null;
private static class CustomTaskThreadPoolFactory {
public static CustomTaskThreadPool CustomTaskThreadPool = new CustomTaskThreadPool();
}
/**
* 线程池单例
*
* @return
*/
public static CustomTaskThreadPool getInstance() {
return CustomTaskThreadPoolFactory.CustomTaskThreadPool;
}
public CustomTaskThreadPool() {
if (CORE_POOLSIZE <= 0 || CORE_POOLSIZE > 10) {
CORE_POOLSIZE = 4;
MAX_POOLSIZE = CORE_POOLSIZE * 10;
}
THREAD_FACTORY = new CustomThreadFactory();
WORK_QUEUE = new ArrayBlockingQueue<>(CAPACITY); REJECTED_EXECUTION_HANDLER = new AbortPolicy();
executor = new ThreadPoolExecutor(
CORE_POOLSIZE,
MAX_POOLSIZE,
KEEP_ALIVETIME,
UNIT,
WORK_QUEUE,
THREAD_FACTORY,
REJECTED_EXECUTION_HANDLER
);
}
static {
Runtime.getRuntime().addShutdownHook(new Thread(new CustomRunnable() {
public void run() {
close();
}
}));
}
private static void close() {
try {
log.warn("即将关闭线程池,等待:{}ms,活跃:{},未完成:{},最大完成:{},总完成:{}",
SHUTDOWN_WAIT_TIME,
executor.getActiveCount(),
executor.getQueue().size(),
executor.getLargestPoolSize(),
executor.getCompletedTaskCount());
executor.shutdown();
//等待关闭
executor.awaitTermination(SHUTDOWN_WAIT_TIME, UNIT);
log.warn("已关闭线程池");
//未完成
List<Runnable> CustomRunnableList = executor.shutdownNow();
if (!CollectionUtil.isEmpty(CustomRunnableList)) {
CustomRunnableList.forEach(e -> log.warn("关闭未完成任务:" + e.toString()));
}
} catch (Exception e) {
e.printStackTrace();
}
}
public boolean execute(Runnable runnable) {
try {
executor.execute(new CustomRunnable(runnable));
} catch (Exception e) {
log.error(e.getMessage(), e);
return false;
}
return true;
}
public <T> Future<T> submit(Callable<T> tCallable) {
try {
return executor.submit(new CustomCallable<T>(tCallable));
} catch (RejectedExecutionException e) {
log.error(e.getMessage(), e);
}
return null;
}
public void shutDown() {
executor.shutdown();
}
/**
* 拒绝策略,丢弃不作任何处理
*/
public static class AbortPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String info = "";
try {
Field field = FutureTask.class.getDeclaredField("callable");
field.setAccessible(true);
FutureTask futureTask = (FutureTask) r;
Callable callable = (Callable) field.get(futureTask);
CustomCallable CustomCallable = (CustomCallable) ((CustomCallable) callable).callable;
info = CustomCallable.toString();
} catch (Exception ex) {
ex.printStackTrace();
}
log.error("AbortPolicy: " + info + " rejected from " + e.toString());
}
}
}
注意
计算密集型
线程数 = CPU核数+1,也可以设置成CPU核数*2,但还要看JDK的版本以及CPU配置(服务器的CPU有超线程)。
IO密集型
线程数 = CPU核心数/(1-阻塞系数) 这个阻塞系数一般为0.8~0.9之间,也可以取0.8或者0.9。
套用公式,对于双核CPU来说,它比较理想的线程数就是20,当然这都不是绝对的,需要根据实际情况以及实际业务来调整:final int poolSize = (int)(cpuCore/(1-0.9))对于阻塞系数,我们可以先试着猜测,抑或采用一些性能分析工具或java.lang.management API 来确定线程花在系统/IO操作上的时间与CPU密集任务所耗的时间比值。
Runtime.getRuntime().availableProcessors()
因为容器不是物理隔离的,使用Runtime.getRuntime().availableProcessors() ,会拿到物理CPU个数,而不是容器申请时的个数。
例如宿主机器是4核CPU16G内存
java 6/7/8/9
docker run –cpus 1 -m 1G -it adoptopenjdk/openjdk9:latest # 给1核
jshell -J-Xmx512M -v # 启动jshell
Runtime.getRuntime().availableProcessors() # 结果是不是1!!!
java 10
docker run –cpus 1 -m 1G -it adoptopenjdk/openjdk10:latest # 给1核
jshell -J-Xmx512M -v # 启动jshell
Runtime.getRuntime().availableProcessors() # 结果是1