多线程场景解决方案汇总
# 多线程场景解决方案汇总
针对于自己多线程场景的弱项,在这边专门记录一下遇到的场景问题,并给出实际解决方案,增加自己的技术能力
# 1、场景一:多接口请求并能及时中断问题
详细需求:
研发一个接口,当前接口会请求N个外部服务,使用多线程去优化调用性能,并且能保证任意一个接口出错的情况下,立即中断所有的接口调用,并进行数据返回
# 1、实现思路1:CompletableFuture
可以使用固定线程池newFixedThreadPool,创建包含N个线程的线程池去执行接口调用
然后使用CompletableFuture.anyOfTheseComplete来监听任何一个CompletableFuture任务的完成。一旦任何一个任务完成,我们检查是否有异常发生。如果有异常,我们取消所有其他任务。
shutdownNow方法被用来尝试立即停止所有正在执行的任务
具体代码如下:
package org.example.service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
*
* @author zhaoyubo
* @date 2024/10/31 9:55
*/
public class Test {
private ExecutorService executorService;
public Test() {
// 创建一个固定大小的线程池
this.executorService = Executors.newFixedThreadPool(3);
}
public void shutdown() {
// 停止线程池
executorService.shutdownNow(); // 使用shutdownNow来尝试立即停止所有正在执行的任务
}
public String[] querySystems() {
// 异步调用三个系统的接口
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(this::querySystem1, executorService);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(this::querySystem2, executorService);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(this::querySystem3, executorService);
CompletableFuture<Object> anyOfTheseComplete = CompletableFuture.anyOf(future1, future2, future3);
anyOfTheseComplete.thenRunAsync(() -> {
try {
// 检查是否有异常
if (future1.isCompletedExceptionally() || future2.isCompletedExceptionally() || future3.isCompletedExceptionally()) {
// 取消所有任务
future1.cancel(true);
future2.cancel(true);
future3.cancel(true);
}
} catch (Exception e) {
// 可以在这里记录日志
}
}, executorService);
try {
// 等待所有任务完成或任何一个任务出现异常
CompletableFuture.allOf(future1, future2, future3).join();
} catch (Exception e) {
// 如果任何一个任务出现异常,将返回错误信息
return new String[]{"有接口出现异常,直接返回....."};
}
// 如果所有任务都成功完成,返回结果数组
return new String[]{future1.join(), future2.join(), future3.join()};
}
private String querySystem1() {
// 模拟调用第一个系统的接口
System.out.println("--- 服务 1 开始调用 ---");
// 模拟延迟
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println("--- 服务 1 结束调用 ---");
return "服务调用 1";
}
private String querySystem2() {
// 模拟调用第二个系统的接口
System.out.println("--- 服务 2 开始调用 ---");
// 模拟延迟
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
int i = 1/0;
System.out.println("--- 服务 2 结束调用 ---");
return "服务调用 2";
}
private String querySystem3() {
// 模拟调用第三个系统的接口
System.out.println("--- 服务 3 开始调用 ---");
// 模拟延迟
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
};
System.out.println("--- 服务 3 结束调用 ---");
return "服务调用 3";
}
public static void main(String[] args) {
Test service = new Test();
try {
String[] result = service.querySystems();
System.out.println("接口调用结果:");
for (String s: result) {
System.out.println(s);
}
} finally {
service.shutdown();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
模拟结果:正常调用结果
模拟结果:出现异常结果:
# 2、实现思路2:CountDownLatch
待补充...
# 2、场景二:细粒度加锁场景
详细需求:根据主单ID计算并更新n个子表数据,为了保证数据的并发安全,需要把全部的数据更新进行加锁处理,但是整体synchronized加锁的话,会影响效率,比如id为1的更新并不影响id为2的更新,所以为了提交并发度,需要进行已ID为标志的细粒度加锁,如何实现?
# 1、实现思路:ConcurrentHashMap
可以使用ConcurrentHashMap存储一个ID和锁的映射关系,这样的话,同一个主键就能使用同一个锁
具体代码如下:
package lock;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author zhaoyubo
* @date 2024/11/14 9:48
* @return
*/
public class IDLockManager {
// 使用一个Map来存储每个主键ID对应的锁对象,这样同一个主键ID的操作会使用同一个锁
private static final ConcurrentHashMap<Long, Lock> idLocks = new ConcurrentHashMap<>();
public static Lock getLock(Long primaryKeyId) {
return idLocks.computeIfAbsent(primaryKeyId, k -> new ReentrantLock());
}
}
package lock;
import java.util.concurrent.locks.Lock;
public class DataTableUpdater {
/**
* 根据主键计算并更新子表数据
* @author zhaoyubo
* @date 2024/11/14 11:27
* @return
*/
public void updateById(Long primaryKeyId) {
Lock lock = IDLockManager.getLock(primaryKeyId);
//观察总共生成了几个锁
System.out.println(lock);
lock.lock();
try {
// 这里编写真正更新第一个数据表的具体逻辑代码
System.out.println("正在更新数据表,主键ID: " + primaryKeyId);
updateData1(primaryKeyId);
updateData2(primaryKeyId);
String name = Thread.currentThread().getName();
System.out.println(name+"更新数据表完成,主键ID: " + primaryKeyId);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void updateData1(Long primaryKeyId)throws InterruptedException{
// 模拟更新操作耗时,比如线程休眠一会儿(实际场景替换为真实的数据库操作)
Thread.sleep(100);
String name = Thread.currentThread().getName();
System.out.println(name+"更新子表1-完成,主键ID: " + primaryKeyId);
}
public void updateData2(Long primaryKeyId)throws InterruptedException{
// 模拟更新操作耗时,比如线程休眠一会儿(实际场景替换为真实的数据库操作)
Thread.sleep(500);
String name = Thread.currentThread().getName();
System.out.println(name+"更新子表2-完成,主键ID: " + primaryKeyId);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
模拟测试:
package lock;
public class Main {
public static void main(String[] args) {
DataTableUpdater updater = new DataTableUpdater();
// 模拟多个线程同时更新不同主键ID的数据表,这里创建了两个线程,分别操作不同主键ID
Thread thread1 = new Thread(() -> updater.updateById(1L));
Thread thread2 = new Thread(() -> updater.updateById(3L));
Thread thread3 = new Thread(() -> updater.updateById(2L));
Thread thread4 = new Thread(() -> updater.updateById(2L));
Thread thread5 = new Thread(() -> updater.updateById(2L));
Thread thread6 = new Thread(() -> updater.updateById(2L));
// 启动线程
thread1.start();
thread2.start();
thread3.start();
thread4.start();
thread5.start();
thread6.start();
try {
// 等待线程执行完成
thread1.join();
thread2.join();
thread3.join();
thread4.join();
thread5.join();
thread6.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("所有更新操作完成");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
测试结果:
(Q&A)再思考一下会不会有别的问题?
Q:使用Map存储数据,目前没有清除操作,时间长了,之前的ID也不会再做更新操作,那么就会产生内存泄漏
A:这个功能设计的初衷就是为了避免当前数据的并发更新,那么只要数据不再进行更新,这个锁数据就没有用了,由此可以进行设计,在数据没有任何编辑操作(终审之后),就可以进行Map数据的清除
public class IDLockManager {
// 使用一个Map来存储每个主键ID对应的锁对象,这样同一个主键ID的操作会使用同一个锁
private static final ConcurrentHashMap<Long, Lock> idLocks = new ConcurrentHashMap<>();
public static Lock getLock(Long primaryKeyId) {
return idLocks.computeIfAbsent(primaryKeyId, k -> new ReentrantLock());
}
/**
* 清除无效数据
* @author zhaoyubo
* @date 2024/11/14 11:40
* @return
*/
public static void remove(Long primaryKeyId){
idLocks.remove(primaryKeyId);
}
}
// 在数据终审的回调中进行lock的清除
auditCallBack(Long id){
IDLockManager.remove(id);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
上次更新: 2024/11/22, 16:46:14