Java-ThreadPool

在软件开发中,池一直都是一种非常优秀的设计思想,通过建立池可以有效的利用系统资源,节约系统性能。Java 中的线程池就是一种非常好的实现,从 JDK 1.5 开始 Java 提供了一个线程工厂 Executors 用来生成线程池,通过 Executors 可以方便的生成不同类型的线程池。但是要更好的理解使用线程池,就需要了解线程池的配置参数意义以及线程池的具体工作机制

线程池可以节约什么资源

多线程流行的原因是因为他能够处理与多进程一样的功能,并且创建线程耗费的时间、资源少,共享进程的资源。多线程有各自的线程ID,栈,PC,寄存器集合组成。共享代码段,文件,数据。
在实际使用中,每个请求创建新线程的服务器在创建和销毁线程上花费的时间和消耗的系统资源,甚至可能要比花在处理实际的用户请求的时间和资源要多得多。除了创建和销毁线程的开销之外,活动的线程也需要消耗系统资源。如果在一个JVM里创建太多的线程,可能会导致系统由于过度消耗内存或“切换过度”而导致系统资源不足。为了防止资源不足,服务器应用程序需要一些办法来限制任何给定时刻处理的请求数目,尽可能减少创建和销毁线程的次数,特别是一些资源耗费比较大的线程的创建和销毁,尽量利用已有对象来进行服务,这就是“池化资源”技术产生的原因。
进程是资源管理的最小单元;而线程是程序执行的最小单元。

线程池的好处

引用自http://ifeve.com/java-threadpool/

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import org.junit.Test;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @version V1.0
*/
public class ThreadPoolTest {

/** 参数初始化 */
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
/** 核心线程数量大小 */
private static final int corePoolSize = Math.max(2, Math.min(CPU_COUNT - 1, 4));
/** 线程池最大容纳线程数 */
private static final int maximumPoolSize = CPU_COUNT * 2 + 1;
/** 线程空闲后的存活时长 */
private static final int keepAliveTime = 30;

/** 任务过多后,存储任务的一个阻塞队列 */
BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();

/** 线程的创建工厂 */
ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AdvacnedAsyncTask #" + mCount.getAndIncrement());
}
};

/** 线程池任务满载后采取的任务拒绝策略 */
RejectedExecutionHandler rejectHandler = new ThreadPoolExecutor.DiscardOldestPolicy();

/** 线程池对象,创建线程 */
ThreadPoolExecutor execute = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
threadFactory,
rejectHandler
);

}

在Java中,线程池的概念是Executor这个接口,具体实现为ThreadPoolExecutor类,学习Java中的线程池,就可以直接学习ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//五个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)

//六个参数的构造函数-1
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)

//六个参数的构造函数-2
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)

//七个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

具体参数介绍
corePoolSize 线程池的核心线程数。在没有设置 allowCoreThreadTimeOut 为 true 的情况下,核心线程会在线程池中一直存活,即使处于闲置状态。

maximumPoolSize 线程池所能容纳的最大线程数。当活动线程(核心线程+非核心线程)达到这个数值后,后续任务将会根据 RejectedExecutionHandler 来进行拒绝策略处理。

keepAliveTime 非核心线程闲置时的超时时长。超过该时长,非核心线程就会被回收。若线程池通过 allowCoreThreadTimeOut() 方法设置 allowCoreThreadTimeOut 属性为 true,则该时长同样会作用于核心线程,AsyncTask 配置的线程池就是这样设置的。

keepAliveTime 时长对应的单位。

unit
TimeUnit是一个枚举类型,其包括:
NANOSECONDS : 1微毫秒 = 1微秒 / 1000
MICROSECONDS : 1微秒 = 1毫秒 / 1000
MILLISECONDS : 1毫秒 = 1秒 /1000
SECONDS : 秒
MINUTES : 分
HOURS : 小时
DAYS : 天

workQueue 线程池中的任务队列,通过线程池的 execute() 方法提交的 Runnable 对象会存储在该队列中。当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务

常用的workQueue类型:

SynchronousQueue:这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大

LinkedBlockingQueue:这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize

ArrayBlockingQueue:可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误

DelayQueue:队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务

ThreadFactory 线程工厂,功能很简单,就是为线程池提供创建新线程的功能。这是一个接口,可以通过自定义,做一些自定义线程名的操作。

RejectedExecutionHandler 当任务无法被执行时(超过线程最大容量 maximum 并且 workQueue 已经被排满了)的处理策略,这里有四种任务拒绝类型。

线程池工作原则
1、当线程池中线程数量小于 corePoolSize 则创建线程,并处理请求。
2、当线程池中线程数量大于等于 corePoolSize 时,则把请求放入 workQueue 中,随着线程池中的核心线程们不断执行任务,只要线程池中有空闲的核心线程,线程池就从 workQueue 中取任务并处理。
3 、当 workQueue 已存满,放不下新任务时则新建非核心线程入池,并处理请求直到线程数目达到 maximumPoolSize(最大线程数量设置值)。
4、如果线程池中线程数大于 maximumPoolSize 则使用 RejectedExecutionHandler 来进行任务拒绝处理。
任务队列 BlockingQueue
任务队列 workQueue 是用于存放不能被及时处理掉的任务的一个队列,它是 一个 BlockingQueue 类型。

关于 BlockingQueue,虽然它是 Queue 的子接口,但是它的主要作用并不是容器,而是作为线程同步的工具,他有一个特征,当生产者试图向 BlockingQueue 放入(put)元素,如果队列已满,则该线程被阻塞;当消费者试图从 BlockingQueue 取出(take)元素,如果队列已空,则该线程被阻塞。(From 疯狂Java讲义)

任务拒绝类型
ThreadPoolExecutor.AbortPolicy:
当线程池中的数量等于最大线程数时抛 java.util.concurrent.RejectedExecutionException 异常,涉及到该异常的任务也不会被执行,线程池默认的拒绝策略就是该策略。

ThreadPoolExecutor.DiscardPolicy():
当线程池中的数量等于最大线程数时,默默丢弃不能执行的新加任务,不报任何异常。

ThreadPoolExecutor.CallerRunsPolicy():
当线程池中的数量等于最大线程数时,重试添加当前的任务;它会自动重复调用execute()方法。

ThreadPoolExecutor.DiscardOldestPolicy():
当线程池中的数量等于最大线程数时,抛弃线程池中工作队列头部的任务(即等待时间最久的任务),并执行新传入的任务。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
* @version V1.0
*/
public class ThreadPoolTest {

private static final Logger logger = LoggerFactory.getLogger(ThreadPoolTest.class);

@Test
public void testExecute() {

logger.info("开始");

ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));

for (int i = 0; i < 15; i++) {
MyTask myTask = new MyTask(i);
executor.execute(myTask);
logger.info("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:"
+ executor.getQueue().size() + ",已执行玩别的任务数目:"
+ executor.getCompletedTaskCount());
}

executor.shutdown();
logger.info("线程池关闭。");

logger.info("执行完了。");
}

class MyTask implements Runnable {

private int taskNum;

public MyTask(int num) {
this.taskNum = num;
}

@Override
public void run() {
logger.info("正在执行task " + taskNum);
try {
// 0.004ns
Thread.currentThread().sleep(4000);
} catch (InterruptedException e) {
logger.error(" {}", e.toString());
}
logger.info("task " + taskNum + "执行完毕");
}
}

@Test
public void testSubmit() {

ExecutorService executorService = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));
//ExecutorService executorService = Executors.newCachedThreadPool();

List<Future<String>> resultList = new ArrayList<>();

// 创建10个任务并执行
for (int i = 0; i < 10; i++) {
// 使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中
Future<String> future = executorService.submit(new TaskWithResult(i));
// 将任务执行结果存储到List中
resultList.add(future);
}

// 遍历任务的结果
for (Future<String> fs : resultList) {
try {

while (!fs.isDone()) ; // Future返回如果没有完成,则一直循环等待,直到Future返回完成

logger.info("执行结果:" + fs.get()); // 打印各个线程(任务)执行的结果

} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
// 启动一次顺序关闭,执行以前提交的任务,但不接受新任务
executorService.shutdown();
}
}
}

class TaskWithResult implements Callable<String> {
private int id;

public TaskWithResult(int id) {
this.id = id;
}

/**
* 任务的具体过程,一旦任务传给ExecutorService的submit方法, 则该方法自动在一个线程上执行
*/
public String call() throws Exception {
logger.info("call()方法被自动调用。 当前线程名:" + Thread.currentThread().getName());
// 该返回结果将被Future的get方法得到
return "call()方法被自动调用,任务返回的结果是:" + id + " " + Thread.currentThread().getName();
}

} // end class TaskWithResult

} // end class

我们可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池,但是它们的实现原理不同,shutdown的原理是只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。shutdownNow的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。shutdownNow会首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。

只要调用了这两个关闭方法的其中一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于我们应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow。

References

[1] 关于线程池的执行原则及配置参数详解
[2] 线程池,这一篇或许就够了
[3] 多线程 线程池ThreadPoolExecutor介绍
[4] 聊聊并发(三)Java线程池的分析和使用
[5] ThreadPoolExecutor里面4种拒绝策略(详细)
[6] 聊聊并发(三)——JAVA线程池的分析和使用
[7] Java并发编程:线程池的使用
[8] Java四种线程池的使用
[9] 并发新特性—Executor 框架与线程池
[10] 线程池踩坑记 –load飙高的原因
[11] 操作系统之线程
[12] 为什么要使用线程池