线程池实现
# 线程池实现
# 1、整体架构设计:
初始化n个线程,并都处于wait状态,一直循环等待任务的进入,一旦有任务进入,则使用notify进行唤醒,获取某个线程进行任务的执行
# 2、代码实现:
线程池定义:
package pool;
/**
* @author zhaoyubo
* @title ThreadPool
* @description 自定义线程池
* @create 2024/4/10 14:37
**/
public interface ThreadPool<Job extends Runnable> {
/**执行任务*/
void execute(Job job);
/**关闭线程池*/
void shutdown();
/**增加工作者线程*/
void addWorkers(int num);
/**减少工作者线程*/
void removeWorker(int num);
/**得到正在等待执行的任务数*/
int getJobSize();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
默认线程池实现:
package pool;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author zhaoyubo
* @title DefaultThreadPool
* @description 线程池默认实现
* @create 2024/4/10 14:45
**/
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job>{
// 最大线程数量
private static final int MAX_WORKER_NUMBERS = 10;
// 默认线程数量
private static final int DEFAULT_WORKER_NUMBERS = 5;
// 最小线程数量
private static final int MIN_WORKER_NUMBERS = 1;
// 任务列表
private final LinkedList<Job> jobs = new LinkedList<Job>();
// 工作者列表
private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
// 工作者线程的数量
private int workerNum =DEFAULT_WORKER_NUMBERS;
// 线程编号生成
private AtomicLong threadNum = new AtomicLong();
//无参构造
public DefaultThreadPool() {initializeWokers(DEFAULT_WORKER_NUMBERS);}
//指定线程数量构造
public DefaultThreadPool(int num) {
workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : Math.max(num, MIN_WORKER_NUMBERS);
initializeWokers(workerNum);
}
private void initializeWokers(int num){
for (int i = 0; i < num; i++) {
Worker worker = new Worker();
workers.add(worker);
Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
thread.start();
}
}
@Override
public void execute(Job job) {
if(job != null){
synchronized (jobs){
//添加一个工作
jobs.addLast(job);
//并通知线程执行
jobs.notify();
}
}
}
@Override
public void shutdown() {
for (Worker worker : workers) {
worker.shoutdown();
}
}
@Override
public void addWorkers(int num) {
synchronized (jobs){
if(num + workerNum > MAX_WORKER_NUMBERS){
num = MAX_WORKER_NUMBERS - workerNum;
}
initializeWokers(num);
workerNum += num;
}
}
@Override
public void removeWorker(int num) {
synchronized (jobs){
if(num >= workerNum){
throw new IllegalArgumentException("beyond workerNum");
}
//按照顺序移除
int count = 0;
while (count < num){
Worker worker = workers.get(count);
if(workers.remove(worker)){
worker.shoutdown();
count++;
}
}
workerNum -= count;
}
}
@Override
public int getJobSize() {
return jobs.size();
}
class Worker implements Runnable {
//是否工作
private volatile boolean running = true;
@Override
public void run() {
while (running){
Job job = null;
synchronized (jobs){
//如果工作队列为空,则等待
while (jobs.isEmpty()){
try {
System.out.println(Thread.currentThread().getName()+"没有任务,等待中......");
jobs.wait();
System.out.println(Thread.currentThread().getName()+"取消等待!开始干活");
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
return;
}
}
//取出一个工作
job = jobs.removeFirst();
}
if(job != null){
try {
System.out.println(Thread.currentThread().getName()+"接收到任务....");
job.run();
}catch (Exception e){
}
}
}
}
public void shoutdown(){
running = false;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
自定义任务:
package pool;
/**
* @author zhaoyubo
* @title Task
* @description 自定义任务
* @create 2024/4/10 15:23
**/
public class Task implements Runnable{
@Override
public void run() {
try {
System.out.println("任务开始.....");
Thread.sleep(1000);
System.out.println("任务结束.....");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
测试场景:
package pool;
/**
* @author zhaoyubo
* @title ThreadPoolTest
* @description 线程池测试
* @create 2024/4/10 15:24
**/
public class ThreadPoolTest {
//初始化线程池 10个线程
static DefaultThreadPool pool = new DefaultThreadPool(10);
public static void main(String[] args) throws InterruptedException {
Thread.sleep(1000);
// 有30个任务需要执行
for (int i = 0; i < 2; i++) {
pool.execute(new Task());
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
上次更新: 2024/04/10, 16:11:47