赵宇博的技术博客 赵宇博的技术博客
首页
前端
后端
数据库专栏
k8s专栏
分布式专栏
Linux网络专栏
手写系列专栏
随笔
关于
GitHub (opens new window)
首页
前端
后端
数据库专栏
k8s专栏
分布式专栏
Linux网络专栏
手写系列专栏
随笔
关于
GitHub (opens new window)
  • JVM专题

  • 源码专题

  • Activi6专题

  • 杂谈

    • jar启动和IDE里启动Sprintboot的区别
    • 多线程场景解决方案汇总
      • 1、场景一:多接口请求并能及时中断问题
        • 1、实现思路1:CompletableFuture
        • 2、实现思路2:CountDownLatch
      • 2、场景二:细粒度加锁场景
        • 1、实现思路:ConcurrentHashMap
  • 后端
  • 杂谈
zhaoyb
2024-10-31
目录

多线程场景解决方案汇总

# 多线程场景解决方案汇总

针对于自己多线程场景的弱项,在这边专门记录一下遇到的场景问题,并给出实际解决方案,增加自己的技术能力


# 1、场景一:多接口请求并能及时中断问题

详细需求:

研发一个接口,当前接口会请求N个外部服务,使用多线程去优化调用性能,并且能保证任意一个接口出错的情况下,立即中断所有的接口调用,并进行数据返回

# 1、实现思路1:CompletableFuture

可以使用固定线程池newFixedThreadPool,创建包含N个线程的线程池去执行接口调用

然后使用CompletableFuture.anyOfTheseComplete来监听任何一个CompletableFuture任务的完成。一旦任何一个任务完成,我们检查是否有异常发生。如果有异常,我们取消所有其他任务。

shutdownNow方法被用来尝试立即停止所有正在执行的任务

具体代码如下:

package org.example.service;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 *
 * @author zhaoyubo
 * @date 2024/10/31 9:55
 */
public class Test {

    private ExecutorService executorService;

    public Test() {
        // 创建一个固定大小的线程池
        this.executorService = Executors.newFixedThreadPool(3);
    }

    public void shutdown() {
        // 停止线程池
        executorService.shutdownNow(); // 使用shutdownNow来尝试立即停止所有正在执行的任务
    }

    public String[] querySystems() {
        // 异步调用三个系统的接口
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(this::querySystem1, executorService);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(this::querySystem2, executorService);
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(this::querySystem3, executorService);

        CompletableFuture<Object> anyOfTheseComplete = CompletableFuture.anyOf(future1, future2, future3);
        anyOfTheseComplete.thenRunAsync(() -> {
            try {
                // 检查是否有异常
                if (future1.isCompletedExceptionally() || future2.isCompletedExceptionally() || future3.isCompletedExceptionally()) {
                    // 取消所有任务
                    future1.cancel(true);
                    future2.cancel(true);
                    future3.cancel(true);
                }
            } catch (Exception e) {
                // 可以在这里记录日志
            }
        }, executorService);

        try {
            // 等待所有任务完成或任何一个任务出现异常
            CompletableFuture.allOf(future1, future2, future3).join();
        } catch (Exception e) {
            // 如果任何一个任务出现异常,将返回错误信息
            return new String[]{"有接口出现异常,直接返回....."};
        }

        // 如果所有任务都成功完成,返回结果数组
        return new String[]{future1.join(), future2.join(), future3.join()};
    }

    private String querySystem1()  {
        // 模拟调用第一个系统的接口
        System.out.println("--- 服务 1 开始调用 ---");
        // 模拟延迟
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        System.out.println("--- 服务 1 结束调用 ---");
        return "服务调用 1";
    }

    private String querySystem2()   {
        // 模拟调用第二个系统的接口
        System.out.println("--- 服务 2 开始调用 ---");
        // 模拟延迟
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
         int i = 1/0;
        System.out.println("--- 服务 2 结束调用 ---");
        return "服务调用 2";
    }

    private String querySystem3()   {
        // 模拟调用第三个系统的接口
        System.out.println("--- 服务 3 开始调用 ---");
        // 模拟延迟
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
        };
        System.out.println("--- 服务 3 结束调用 ---");
        return "服务调用 3";
    }

    public static void main(String[] args) {
        Test service = new Test();
        try {
            String[] result = service.querySystems();
            System.out.println("接口调用结果:");
            for (String  s: result) {
                System.out.println(s);
            }

        } finally {
            service.shutdown();
        }
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110

模拟结果:正常调用结果

image-20241031095910304

模拟结果:出现异常结果:

image-20241031104432725

# 2、实现思路2:CountDownLatch

待补充...

# 2、场景二:细粒度加锁场景

详细需求:根据主单ID计算并更新n个子表数据,为了保证数据的并发安全,需要把全部的数据更新进行加锁处理,但是整体synchronized加锁的话,会影响效率,比如id为1的更新并不影响id为2的更新,所以为了提交并发度,需要进行已ID为标志的细粒度加锁,如何实现?

# 1、实现思路:ConcurrentHashMap

可以使用ConcurrentHashMap存储一个ID和锁的映射关系,这样的话,同一个主键就能使用同一个锁

具体代码如下:

package lock;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author zhaoyubo
 * @date 2024/11/14 9:48
 * @return 
 */
public class IDLockManager {

    // 使用一个Map来存储每个主键ID对应的锁对象,这样同一个主键ID的操作会使用同一个锁
    private static final ConcurrentHashMap<Long, Lock> idLocks = new ConcurrentHashMap<>();

    public static Lock getLock(Long primaryKeyId) {
        return idLocks.computeIfAbsent(primaryKeyId, k -> new ReentrantLock());
    }
}


package lock;

import java.util.concurrent.locks.Lock;

public class DataTableUpdater {

    /**
     * 根据主键计算并更新子表数据
     * @author zhaoyubo
     * @date 2024/11/14 11:27
     * @return 
     */
    public void updateById(Long primaryKeyId) {
        Lock lock = IDLockManager.getLock(primaryKeyId);
        //观察总共生成了几个锁
        System.out.println(lock);
        lock.lock();
        try {
            // 这里编写真正更新第一个数据表的具体逻辑代码
            System.out.println("正在更新数据表,主键ID: " + primaryKeyId);
            updateData1(primaryKeyId);
            updateData2(primaryKeyId);
            String name = Thread.currentThread().getName();
            System.out.println(name+"更新数据表完成,主键ID: " + primaryKeyId);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void updateData1(Long primaryKeyId)throws InterruptedException{
        // 模拟更新操作耗时,比如线程休眠一会儿(实际场景替换为真实的数据库操作)
        Thread.sleep(100);
        String name = Thread.currentThread().getName();
        System.out.println(name+"更新子表1-完成,主键ID: " + primaryKeyId);
    }

    public void updateData2(Long primaryKeyId)throws InterruptedException{
        // 模拟更新操作耗时,比如线程休眠一会儿(实际场景替换为真实的数据库操作)
        Thread.sleep(500);
        String name = Thread.currentThread().getName();
        System.out.println(name+"更新子表2-完成,主键ID: " + primaryKeyId);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67

模拟测试:

package lock;

public class Main {
    public static void main(String[] args) {
        DataTableUpdater updater = new DataTableUpdater();

        // 模拟多个线程同时更新不同主键ID的数据表,这里创建了两个线程,分别操作不同主键ID
        Thread thread1 = new Thread(() -> updater.updateById(1L));
        Thread thread2 = new Thread(() -> updater.updateById(3L));
        Thread thread3 = new Thread(() -> updater.updateById(2L));
        Thread thread4 = new Thread(() -> updater.updateById(2L));
        Thread thread5 = new Thread(() -> updater.updateById(2L));
        Thread thread6 = new Thread(() -> updater.updateById(2L));

        // 启动线程
        thread1.start();
        thread2.start();
        thread3.start();
        thread4.start();
        thread5.start();
        thread6.start();

        try {
            // 等待线程执行完成
            thread1.join();
            thread2.join();
            thread3.join();
            thread4.join();
            thread5.join();
            thread6.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("所有更新操作完成");
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

测试结果:

image-20241114113500316

(Q&A)再思考一下会不会有别的问题?

Q:使用Map存储数据,目前没有清除操作,时间长了,之前的ID也不会再做更新操作,那么就会产生内存泄漏

A:这个功能设计的初衷就是为了避免当前数据的并发更新,那么只要数据不再进行更新,这个锁数据就没有用了,由此可以进行设计,在数据没有任何编辑操作(终审之后),就可以进行Map数据的清除

public class IDLockManager {

    // 使用一个Map来存储每个主键ID对应的锁对象,这样同一个主键ID的操作会使用同一个锁
    private static final ConcurrentHashMap<Long, Lock> idLocks = new ConcurrentHashMap<>();

    public static Lock getLock(Long primaryKeyId) {
        return idLocks.computeIfAbsent(primaryKeyId, k -> new ReentrantLock());
    }

    /**
     * 清除无效数据
     * @author zhaoyubo
     * @date 2024/11/14 11:40
     * @return 
     */
    public static void remove(Long primaryKeyId){
        idLocks.remove(primaryKeyId);
    }
}

// 在数据终审的回调中进行lock的清除
auditCallBack(Long id){
    IDLockManager.remove(id);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#多线程
上次更新: 2024/11/22, 16:46:14
jar启动和IDE里启动Sprintboot的区别

← jar启动和IDE里启动Sprintboot的区别

最近更新
01
Activiti6-业务实现
12-06
02
Activiti6-API详解
11-28
03
SpringBoot集成Activiti和UI
11-21
更多文章>
Theme by Vdoing | Copyright © 2022-2024 赵宇博 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式