Welcome
在实际的生产开发环境中,有时会遇到需要简易并行处理的场景。譬如, 多项任务之间有几点共性:
- 具有相同/类型的返回结果
- 任务与任务具备隔离性(相互没有影响)
- 每项任务的执行都比较耗时
示例如下:
1
2
3
4
5
6
7
8
| // 批量保存 xxxx 对象。 耗时:5s
boolean res1 = service1.saveBatch(xxxx);
// 批量更新 yyyy 对象。耗时:4s
boolean res2 = service2.updateBatchByIds(yyyy);
// 通过平台(HTTP)发送消息,并等待结果。耗时:6s
boolean res3 = service3.sendMsg(jjjj);
// 通过 RPC 向其他 Module 操作,并等待结果。耗时:3s
boolean res4 = service4.rpcOperate(jjjj);
|
此时,我们既可以针对此情景,异步执行每一项任务,并在最后收集结果。
最终的目的:扩大资源利用率,减少任务列表执行的总耗时。 如上示例串行需要耗时 18秒,而并行只需要 6秒左右!
1. 依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| <dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.9.0</version>
<scope>test</scope>
</dependency>
</dependencies>
|
2. 源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
| package org.example.starter.util;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Supplier;
/**
* 并行工具类。
* <pre>
* 适用:并行的任务中 I/O 耗时长的场景
* 1. 具有相同/类型的返回结果
* 2. 任务与任务具备隔离性(相互没有影响)
* 3. 每项任务的执行都比较耗时
* </pre>
*
* @author Oriental Ming
* @date 2023/5/11 19:58
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ParallelUtil {
/**
* 初始化公共的缓存线程池。
* 如果任务特别多,耗时过长,队列时间太长容易出现长时间不执行的问题
*/
private static final ExecutorService CACHED_THREAD_POOL = Executors.newCachedThreadPool();
/**
* 对任务列表中的每一项任务异步执行,并收集每一项任务的执行结果(自研)
*
* @param taskList 任务列表
* @param <T> 对象
* @return 任务结果集
*/
@SneakyThrows
public static <T> Set<T> execAsyncBatchByMine(Collection<Supplier<T>> taskList) {
// 异步执行,并收集异步过程
LinkedList<Future<T>> collectFuture = new LinkedList<>();
for (Supplier<T> supplier : taskList) {
// 借用 Hutool 封装
Future<T> future = CACHED_THREAD_POOL.submit(supplier::get);
collectFuture.add(future);
}
// 循环获取结果
Set<T> result = new HashSet<>();
// 阻塞循环收集并行集合的结果
int index = 0;
while (!collectFuture.isEmpty()) {
Future<T> future = collectFuture.get(index);
if (future.isDone()) {
// 归并异步执行的结果
result.add(future.get());
// 删除已执行完毕的任务(底层原理是链表的解链)
collectFuture.remove(index);
// 如果有执行完毕的,就从头开始检查
index = 0;
continue;
}
// 如果循环完成后,任务存在还没有执行完的,则从头开始
if (collectFuture.size() == ++index) {
index = 0;
}
}
return result;
}
/**
* 对任务列表中的每一项任务异步执行,并收集每一项任务的执行结果(Guava 实现)
*
* @param taskList 任务列表
* @param <T> 对象
* @return 任务结果集
*/
@SneakyThrows
public static <T> Set<T> execAsyncBatchByGuava(Collection<Callable<T>> taskList) {
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(CACHED_THREAD_POOL);
List<Future<T>> futures = executorService.invokeAll(taskList);
Set<T> result = new HashSet<>();
for (Future<T> future : futures) {
result.add(future.get());
}
return result;
}
}
|
3. Unit Test
3.1 Abstract class
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| package org.example;
import cn.hutool.core.util.RandomUtil;
import lombok.SneakyThrows;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* 公共的异步测试类
*
* @author Oriental Ming
* @date 2023/5/11 20:18
*/
public abstract class AbstractAsyncTest {
@SneakyThrows
protected int sleepTime(int o, Consumer<String> consumer) {
int sleepMillSec = RandomUtil.randomInt(3000, 7000);
Thread thread = Thread.currentThread();
String printMsg = String.format("对象: %d,\t线程ID: %d, \t线程name: %s,\t 延迟 %d 毫秒 \n", o, thread.getId(), thread.getName(), sleepMillSec);
consumer.accept(printMsg);
TimeUnit.MILLISECONDS.sleep(sleepMillSec);
return o;
}
}
|
3.2 ParallelUtil Test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
| package org.example.starter.util;
import org.example.AbstractAsyncTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.Supplier;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
class ParallelUtilTest extends AbstractAsyncTest {
List<Supplier<Integer>> params1;
List<Callable<Integer>> params2;
@BeforeEach
void setUp() {
params1 = new ArrayList<>(6);
params2 = new ArrayList<>(6);
for (int i = 0; i < 6; i++) {
int finalI = i;
params1.add(() -> sleepTime(finalI, System.out::println));
params2.add(() -> sleepTime(finalI, System.out::println));
}
}
/**
* 串行执行(作为结果的对比)
*/
@Test
void serialExec() {
long start = System.currentTimeMillis();
Set<Integer> result = Sets.newHashSet();
for (Supplier<Integer> supplier : params1) {
result.add(supplier.get());
}
System.err.printf("----------总耗时 %d 毫秒------------ \n", (System.currentTimeMillis() - start));
assertNotNull(result);
System.out.println(result);
assertEquals(6, result.size());
}
@Test
void execAsyncBatchByMine() {
long start = System.currentTimeMillis();
Set<Integer> result = ParallelUtil.execAsyncBatchByMine(params1);
System.err.printf("----------总耗时 %d 毫秒------------ \n", (System.currentTimeMillis() - start));
assertNotNull(result);
System.out.println(result);
assertEquals(6, result.size());
}
@Test
void execAsyncBatchByGuava() {
long start = System.currentTimeMillis();
Set<Integer> result = ParallelUtil.execAsyncBatchByGuava(params2);
System.err.printf("----------总耗时 %d 毫秒------------ \n", (System.currentTimeMillis() - start));
assertNotNull(result);
System.out.println(result);
assertEquals(6, result.size());
}
}
|
4. 测试结果
4.1 serialExec Method
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| 对象: 0, 线程ID: 1, 线程name: main, 延迟 6874 毫秒
对象: 1, 线程ID: 1, 线程name: main, 延迟 3484 毫秒
对象: 2, 线程ID: 1, 线程name: main, 延迟 4127 毫秒
对象: 3, 线程ID: 1, 线程name: main, 延迟 3917 毫秒
对象: 4, 线程ID: 1, 线程name: main, 延迟 4705 毫秒
对象: 5, 线程ID: 1, 线程name: main, 延迟 5731 毫秒
----------总耗时 28874 毫秒------------
[0, 1, 2, 3, 4, 5]
|
4.2 execAsyncBatchByMine Method
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
对象: 1, 线程ID: 17, 线程name: pool-1-thread-2, 延迟 6475 毫秒
对象: 0, 线程ID: 16, 线程name: pool-1-thread-1, 延迟 5831 毫秒
对象: 2, 线程ID: 18, 线程name: pool-1-thread-3, 延迟 4535 毫秒
对象: 3, 线程ID: 19, 线程name: pool-1-thread-4, 延迟 3002 毫秒
对象: 4, 线程ID: 20, 线程name: pool-1-thread-5, 延迟 5289 毫秒
对象: 5, 线程ID: 21, 线程name: pool-1-thread-6, 延迟 6412 毫秒
----------总耗时 6495 毫秒------------
[0, 1, 2, 3, 4, 5]
|
4.3 execAsyncBatchByGuava Method
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| 对象: 2, 线程ID: 16, 线程name: pool-1-thread-1, 延迟 5643 毫秒
对象: 4, 线程ID: 18, 线程name: pool-1-thread-3, 延迟 6600 毫秒
对象: 3, 线程ID: 20, 线程name: pool-1-thread-5, 延迟 5497 毫秒
对象: 5, 线程ID: 19, 线程name: pool-1-thread-4, 延迟 5708 毫秒
对象: 0, 线程ID: 17, 线程name: pool-1-thread-2, 延迟 4110 毫秒
对象: 1, 线程ID: 21, 线程name: pool-1-thread-6, 延迟 5167 毫秒
----------总耗时 6612 毫秒------------
[0, 1, 2, 3, 4, 5]
|
完结撒花 😂 ! 制作不易,如引用原文,必须附此原文链接,否则违者必究!😈