Welcome
提供一个可供参考的线程池配置。重点是对线程数量的配置和拒绝策略的制定。
1. 线程数配置策略
线程池的理想大小取决于被提交任务的类型以及所部属系统的特性。 在代码中通常不会固定线程池的大小,而应该通过某种配置机制来 提供,或者根据 Runtime.availableProcessors
来动态计算。
幸运的是,要设置线程池的大小也并不困难,只需要避免“过大” 和 “过小” 这两种极端情况。
- 如果线程池过大,那么大量的线程将在相对很少的CPU和内存资源上发生竞争,这不仅会导致更高的内存使用量,而且还可能耗尽资源。
- 如果线程池过小,那么将导致许多空闲的处理器无法执行工作,从而降低吞吐率。
要想正确的配置线程池的大小,必须分析计算环境、资源预算和任务的特性。 在部署的系统中有多少个CPU?多大的内存?任务是 计算密集型、I/O密集型还是二者皆可?他们是在需要像JDBC连接这样的稀缺资源?如果需要执行不同类别的任务,并且他们之间的行为 相差很啊,那么应该考虑使用多个线程池,从而使每个线程池可以根据各自的工作负载来调整。
要正确的设置线程池的大小,你必须估算出任务的等待时间与计算时间的比值。 这种估算不需要很精确,并且可以通过一些分析或 监控工具来获得。 你还可以通过另一种方法来调节线程池的大小:在某个基准负载下,分别设置不同大小的线程池来运行应用程序,并观察CPU利用率的水平。
给定下列定义:
- Ncpu = number of CPUs (CPU 数量)
- Ucpu = target CPU utilization (CPU 目标利用率),0 <= Ucpu <=1
- W/C = ratio of wait time to compute time (等待时间与计算时间的比率)
要使处理器达到期望的使用率,线程池的最优大小等于:
Nthreads = Ncpu * Ucpu * (1+ W/C)
可以通过 Runtime
来获得 CPU 的数量:
1
int N_CPUS = Runtime.getRuntime().availableProcessors();
注意,CPU 周期并不是唯一影响线程池大小的资源,还包括内存、文件句柄、套接字句柄和数据库连接等。计算这些资源对线程池的 约束条件:计算每个任务对该资源的需求量,然后用该资源的可用总量除以每个任务的需求量,所得结果就是线程池大小的上限!
计算密集型的任务可以设定:Nthreads = Ncpu + 1,通常能实现最优的利用率。即使当计算密集型的线程 偶尔由于页缺失故障或者其他原因而暂停时,这个 “额外” 的线程也能确保 CPU 的时钟周期不会被浪费。
I/O操作任务或其他阻塞操作的任务,由于线程并不会一直执行,因此线程池的规模应该更大。
———— 以上内容摘自 《Java 并发编程实战》 第8章第2节 ————
2. 示例配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.github.config;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadFactory;
/**
* 线程池配置
*
* @author Oriental Ming
*/
@Configuration
public class ThreadPoolTaskConfig {
/**
* 定时任务之核心线程数量
*/
private static final int CORE_POOL_SIZE;
/**
* 最大线程池的数量默认是机器逻辑核数量
*/
private static final int MAXIMUM_POOL_SIZE;
/**
* 线程存活时间,单位秒
*/
private static final int KEEP_ALIVE_TIME = 60;
/**
* 队列的长度
*/
private static final int QUEUE_CAPACITY = 100;
/**
* 线程工厂名称
*/
private static final ThreadFactory FACTORY;
/**
* 线程池命名前缀
*/
private static final String THREAD_NAME_PREFIX = "self-worker-";
static {
int computerCoreSize = Runtime.getRuntime().availableProcessors();
CORE_POOL_SIZE = computerCoreSize << 1;
MAXIMUM_POOL_SIZE = computerCoreSize << 2;
FACTORY = new ThreadFactoryBuilder().setNamePrefix(THREAD_NAME_PREFIX).setDaemon(true).build();
}
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
poolTaskExecutor.setCorePoolSize(CORE_POOL_SIZE);
poolTaskExecutor.setMaxPoolSize(MAXIMUM_POOL_SIZE);
poolTaskExecutor.setKeepAliveSeconds(KEEP_ALIVE_TIME);
poolTaskExecutor.setQueueCapacity(QUEUE_CAPACITY);
poolTaskExecutor.setThreadNamePrefix(THREAD_NAME_PREFIX);
poolTaskExecutor.setThreadFactory(FACTORY);
// 当缓存队列和MaxPoolSize达到上限后,执行自定义拒绝策略
poolTaskExecutor.setRejectedExecutionHandler(new MyRejectedExecutionHandler());
poolTaskExecutor.setAllowCoreThreadTimeOut(true);
poolTaskExecutor.initialize();
return poolTaskExecutor;
}
}
3. 自定义拒绝策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.github.config;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.LongAdder;
/**
* 自定义线程池拒绝策略
* <pre>
* 创建新线程去执行任务
* </pre>
*
* @author Oriental Ming
*/
@Slf4j
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {
/**
* 记录新增线程数量
*/
private static final LongAdder NUMBER = new LongAdder();
/**
* 记录新增线程数量
*/
private static final LongAdder RESET_NUMBER = new LongAdder();
/**
* 拒绝策略新增线程数量记录单位20, 每新增20重新记录
* <pre>
* 总新增线程数计算公式: RESET_NUMBER * 20 + (NUMBER+1)
* </pre>
*/
private static final int MAX_LIMIT = 19;
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
int currentNumber = NUMBER.intValue();
if (currentNumber >= MAX_LIMIT) {
NUMBER.reset();
RESET_NUMBER.increment();
log.info("线程池拒绝策略, 新增线程计数重置第{}次", RESET_NUMBER.intValue());
}
NUMBER.increment();
new Thread(r, "new-add-Thread-" + currentNumber).start();
log.info("触发线程池拒绝策略, 新增线程, 编号: {}", currentNumber);
}
}
完结撒花 😂 ! 制作不易,如引用原文,必须附此原文链接,否则违者必究!😈