主题
线程池
Java中的线程池是通过java.util.concurrent
包中提供的Executor框架实现的,提供了一种管理和控制线程的方法,以提高性能和资源利用率。
优势
重用存在的线程,减少对象创建、消亡的开销,性能佳
可有效控制最大并发线程数,提高系统资源利用率,同时可以避免过多资源竞争,避免阻塞
提供定时执行、定期执行、单线程、并发数控制等功能
new Thread的劣势
每次 new Thread 新建对象,性能差
线程缺乏统一管理,可能无限制的新建线程,相互竞争,有可能占用过多系统资源导致死机或OOM
缺少更多功能,如更多执行、定期执行、线程中断
工作原理
JVM先根据用户的参数创建一定数量的可运行的线程任务,并将其放入队列中,在线程创建后启动这些任务,如果线程数量超过了最大线程数量(用户设置的线程池大小),则超出数量的线程排队等候,在有任务执行完毕后,线程池调度器会发现有可用的线程,进而再次从队列中取出任务并执行。
工作流程
线程池刚被创建时,只是向系统申请一个用于执行线程队列和管理线程池的线程资源。在调用execute()添加一个任务时,线程池会按照以下流程执行任务。 1、如果正在运行的线程数量少于corePoolSize(用户定义的核心线程数),线程池就会立刻创建线程并执行该线程任务。
2、如果正在运行的线程数量大于等于corePoolSize,该任务就将被放入阻塞队列中。
3、在阻塞队列已满且正在运行的线程数量少于maximumPoolSize时,线程池会创建非核心线程立刻执行该线程任务。
4、在阻塞队列已满且正在运行的线程数量大于等于maximumPoolSize时,线程池将拒绝执行该线程任务并抛出RejectExecutionException异常。
5、在线程任务执行完毕后,该任务将被从线程池队列中移除,线程池将从队列中取下一个线程任务继续执行。
6、在线程处于空闲状态的时间超过keepAliveTime时间时,正在运行的线程数量超过corePoolSize,该线程将会被认定为空闲线程并停止。因此在线程池中所有线程任务都执行完毕后,线程池会收缩到corePoolSize大小。
核心组成
一个线程池包括以下四个基本组成部分
1.线程管理器(ThreadPool):用于创建并管理线程池,包括创建线程,销毁线程池添加新任务
2.工作线程(PoolWorker):线程池中线程,在没有处于等待状态时,可以循环的执行任务;
3.任务接口(Task): 每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等
4.任务队列TaskQueue):用于存放没有处理的任务。提供一种缓冲机制
Executor接口
这是线程池的核心接口,定义了执行Runnable任务的基本方法。
execute(Runnable command)`:提交一个Runnable任务给线程池去执行。
ExecutorService接口
扩展自Executor接口,提供了更丰富的线程池服务功能。 submit(Callable<T> task)
:提交一个Callable任务,并返回Future来获取计算结果。
shutdown()
:启动线程池的关闭过程,在不再接受新任务的同时,等待所有已提交的任务完成。
shutdownNow()
:尝试停止所有正在执行的任务并取消所有未开始的任务,然后返回尚未执行的任务列表。
awaitTermination(long timeout, TimeUnit unit)
:等待所有任务终止,或者达到指定的超时时间。
ThreadPoolExecutor
这是一个实现了ExecutorService接口的具体线程池实现类,它允许我们配置核心线程数、最大线程数、线程存活时间、工作队列策略等参数。
造函数参数: corePoolSize
:线程池维护的最小线程数量,即使这些线程空闲,它们也不会被回收。核心线程数
maximumPoolSize
:线程池能够容纳的最大线程数量。
keepAliveTime
:当线程池中的线程数量超过corePoolSize时,空闲线程等待新任务的时间(单位由TimeUnit
指定),超过这个时间就会被回收。
unit
:与keepAliveTime
配合使用的单位。
workQueue
:用于存储待处理任务的工作队列,可以选择使用无界队列如LinkedBlockingQueue
或有界队列如ArrayBlockingQueue
等。
threadFactory
:创建新线程的工厂,通常用来设置线程名称和优先级等。
handler
:拒绝策略,当线程池和工作队列都满时,如何处理新提交的任务。 内置四种拒绝策略:AbortPolicy(抛出RejectedExecutionException异常)、CallerRunsPolicy(调用者线程自己执行该任务)、DiscardPolicy(直接丢弃任务)和DiscardOldestPolicy(丢弃队列中最旧的一个任务)。
CPU密集型任务,就需要尽量压CPU,参考值可以设为NCPU+1
IO密集型任务,参考值可以设置为2*NCPU
下面是 ThreadPoolExecutor
类中常用方法的含义概述:
方法 | 含义概述 |
---|---|
void execute(Runnable command) | 提交一个任务给线程池执行,任务会在一个线程上执行。 |
Future<?> submit(Runnable task) | 提交一个 Runnable 任务给线程池执行,并返回一个表示该任务的 Future。 |
Future<T> submit(Callable<T> task) | 提交一个 Callable 任务给线程池执行,并返回一个表示该任务的 Future。 |
void shutdown() | 优雅地关闭线程池,该方法会等待所有已提交的任务执行完毕后再关闭线程池。 |
List<Runnable> shutdownNow() | 强制关闭线程池,该方法会尝试取消所有正在执行的任务,并返回未执行的任务列表。 |
boolean isShutdown() | 判断线程池是否已经关闭。 |
boolean isTerminated() | 判断线程池是否已经终止,即所有任务执行完毕并且线程池关闭。 |
boolean awaitTermination(long timeout, TimeUnit unit) | 等待线程池中所有任务执行完毕并且线程池关闭,最多等待指定的时间,如果超时则返回 false。 |
void setCorePoolSize(int corePoolSize) | 设置线程池的核心线程数,核心线程数决定了线程池中的基本线程数量,即使处于空闲状态也不会被销毁。 |
void setMaximumPoolSize(int maximumPoolSize) | 设置线程池的最大线程数,最大线程数决定了线程池中最多可以拥有的线程数量,超过最大线程数的任务会被放入任务队列中等待执行。 |
void setKeepAliveTime(long time, TimeUnit unit) | 设置线程池中空闲线程的存活时间,超过该时间后空闲线程会被销毁,直到线程数等于核心线程数为止。 |
void allowCoreThreadTimeOut(boolean value) | 设置是否允许核心线程超时退出,默认情况下,只有非核心线程才会超时退出。 |
void setThreadFactory(ThreadFactory threadFactory) | 设置用于创建新线程的线程工厂。 |
void setRejectedExecutionHandler(RejectedExecutionHandler handler) | 设置拒绝策略,用于处理任务提交给线程池但无法执行的情况。 |
void setThreadFactory(ThreadFactory threadFactory) | 设置用于创建新线程的线程工厂。 |
BlockingQueue<Runnable> getQueue() | 获取线程池使用的任务队列。 |
int getCorePoolSize() | 获取线程池的核心线程数。 |
int getMaximumPoolSize() | 获取线程池的最大线程数。 |
long getKeepAliveTime(TimeUnit unit) | 获取线程池中空闲线程的存活时间。 |
ThreadFactory getThreadFactory() | 获取用于创建新线程的线程工厂。 |
RejectedExecutionHandler getRejectedExecutionHandler() | 获取拒绝策略,用于处理任务提交给线程池但无法执行的情况。 |
线程池状态方法
方法 | 含义概述 |
---|---|
long getTaskCount() | 返回已经提交给线程池但尚未执行的任务总数。 |
long getCompletedTaskCount() | 返回已经执行完成的任务总数。 |
int getPoolSize() | 返回线程池中当前的线程数量,即当前活跃的线程数量加上正在执行任务的线程数量。 |
int getActiveCount() | 返回线程池中当前正在执行任务的线程数量。 |
这些方法可以用于监控线程池的状态和任务执行情况,例如了解当前任务队列中还有多少任务待执行、已经完成了多少任务、当前线程池中有多少个活跃线程以及有多少个正在执行任务的线程等信息。
Executors工具类
newCachedThreadPool()
:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool(int nThreads)
:创建一个固定大小的线程池,可以同时运行nThreads个任务。
newSingleThreadExecutor()
:创建只有一个线程的线程池,保证同一时刻只有一个任务在执行。
newScheduledThreadPool(int corePoolSize)
:创建一个定长线程池,支持定时及周期性任务执行。
newCachedThreadPool
java
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;
executorService.execute(new Runnable() {
@Override
public void run() {
log.info("task:{}", index);
}
});
}
executorService.shutdown();
newFixedThreadPool
java
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int index = i;
executorService.execute(new Runnable() {
@Override
public void run() {
log.info("task:{}", index);
}
});
}
executorService.shutdown();
newSingleThreadExecutor
java
public class ThreadPoolExample3 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
executorService.execute(new Runnable() {
@Override
public void run() {
log.info("task:{}", index);
}
});
}
executorService.shutdown();
}
}
newScheduledThreadPool
3秒后执行
java
public class ThreadPoolExample4 {
public static void main(String[] args) {
log.info("start");
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.schedule(new Runnable() {
@Override
public void run() {
log.warn("schedule run");
}
}, 3, TimeUnit.SECONDS);
executorService.shutdown();
}
}
输出
java
19:37:06.754 [main] INFO cn.diyai.mul_thread.thread_pool.ThreadPoolExample4 - start
19:37:09.776 [pool-1-thread-1] WARN cn.diyai.mul_thread.thread_pool.ThreadPoolExample4 - schedule run
1秒后每隔3秒执行一次
java
public class ThreadPoolExample7 {
public static void main(String[] args) {
log.info("start");
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
log.warn("schedule run");
}
}, 1, 3, TimeUnit.SECONDS);
}
}
scheduleWithFixedDelay
输出
java
19:54:57.841 [main] INFO cn.diyai.mul_thread.thread_pool.ThreadPoolExample7 - start
19:54:58.859 [pool-1-thread-1] WARN cn.diyai.mul_thread.thread_pool.ThreadPoolExample7 - schedule run
19:55:01.862 [pool-1-thread-1] WARN cn.diyai.mul_thread.thread_pool.ThreadPoolExample7 - schedule run
19:55:04.871 [pool-1-thread-1] WARN cn.diyai.mul_thread.thread_pool.ThreadPoolExample7 - schedule run
19:55:07.883 [pool-1-thread-1] WARN cn.diyai.mul_thread.thread_pool.ThreadPoolExample7 - schedule run
java
public class ThreadPoolExample5 {
public static void main(String[] args) {
log.info("start");
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
log.warn("schedule run");
}
}, 1, 3, TimeUnit.SECONDS);
}
}
scheduleAtFixedRate
scheduleAtFixedRate
和 scheduleWithFixedDelay
是 ScheduledExecutorService
接口提供的两种定时任务调度方法,它们之间的区别主要在于调度策略的不同:
scheduleAtFixedRate
方法:- 该方法会按照固定的时间间隔执行任务。
- 任务的执行时间不受上一次任务执行时间的影响,即使前一次任务执行时间超过了指定的时间间隔,也会按照指定的时间间隔执行下一次任务。
- 如果任务执行时间比指定的时间间隔还要长,那么下一次任务会立即开始执行,不会等待前一个任务执行结束。
scheduleWithFixedDelay
方法:- 该方法会在每次任务执行完成后,等待固定的时间间隔后再执行下一次任务。
- 任务的执行时间会受到上一次任务执行时间的影响,即使前一次任务执行时间超过了指定的时间间隔,也会在任务执行完成后等待指定的时间间隔后再执行下一次任务。
- 如果任务执行时间比指定的时间间隔还要长,那么下一次任务会在任务执行完成后立即开始等待固定的时间间隔后再执行。
scheduleAtFixedRate
方法是基于固定时间间隔调度任务的,而 scheduleWithFixedDelay
方法是基于固定等待时间间隔调度任务的。选择使用哪种方法取决于具体的需求和任务执行的特性。
每隔2秒执行一次
java
public class ThreadPoolExample6 {
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
log.warn("timer run");
}
}, new Date(), 2 * 1000);
}
}
输出
java
19:57:58.626 [Timer-0] WARN cn.diyai.mul_thread.thread_pool.ThreadPoolExample6 - timer run
19:58:00.632 [Timer-0] WARN cn.diyai.mul_thread.thread_pool.ThreadPoolExample6 - timer run
newWorkStealingPool
用于创建一个基于工作窃取算法的线程池。工作窃取算法是一种并行计算中常用的任务调度算法,它将任务分配给线程池中的工作线程,并且具有动态地从其他线程窃取任务的能力,以确保线程池中的所有线程都能保持忙碌状态,从而充分利用 CPU 资源。
java
public class ThreadPoolExample8 {
public static void main(String[] args) {
log.info("start");
// 创建一个具有4个并行线程的工作窃取线程池
ExecutorService executor = Executors.newWorkStealingPool(4);
// 提交一组任务给线程池执行
for (int i = 0; i < 10; i++) {
int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " is executing by thread " + Thread.currentThread().getName());
// 模拟任务执行时间
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 关闭线程池
executor.shutdown();
}
}
输出
java
20:02:34.245 [main] INFO cn.diyai.mul_thread.thread_pool.ThreadPoolExample8 - start
Task 2 is executing by thread ForkJoinPool-1-worker-3
Task 0 is executing by thread ForkJoinPool-1-worker-1
Task 3 is executing by thread ForkJoinPool-1-worker-0
Task 1 is executing by thread ForkJoinPool-1-worker-2
拒绝策略
若线程池中的核心线程数被用完且阻塞队列已排满,则此时线程池的线程资源已耗尽,线程池将没有足够的线程资源执行新的任务。
为了保证操作系统的安全,线程池将通过拒绝策略处理新添加的线程任务。
JDK内置的拒绝策略有AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy、DiscardPolicy这4种,默认的拒绝策略在ThreadPoolExecutor中作为内部类提供。在默认的拒绝策略不能满足应用的需求时,可以自定义拒绝策略。
1.AbortPolicy指直接抛出异常,阻止线程正常运行;
2.CallerRunsPolicy指如果被丢弃的线程任务未关闭,则执行该线程任务。注意,CallerRunsPolicy拒绝策略不会真的丢弃任务;
3.DiscardOldestPolicy指移除线程队列中最早的一个线程任务,并尝试提交当前任务;
4.DiscardPolic指丢弃当前的线程任务而不做任何处理。如果系统允许在资源不足的情况下丢弃部分任务,则这将是保障系统安全、稳定的一种很好的方案。
自定义拒绝策略
扩展RejectedExecutionHandler
接口来实现拒绝策略,并捕获异常来实现自定义拒绝策略。
java
import java.util.concurrent.*;
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 打印警告信息
System.err.println("Task rejected by custom rejection handler: " + r.toString());
}
public static void main(String[] args) {
// 创建一个线程池,核心线程数为2,最大线程数为4,队列容量为2,使用自定义的拒绝策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
10, // 空闲线程存活时间
TimeUnit.SECONDS, // 存活时间单位
new ArrayBlockingQueue<>(2), // 任务队列
new CustomRejectedExecutionHandler() // 自定义拒绝策略
);
// 提交一组任务给线程池执行
for (int i = 0; i < 10; i++) {
int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " is executing by thread " + Thread.currentThread().getName());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 关闭线程池
executor.shutdown();
}
}
输出
java
20:09:14.990 [main] ERROR ...CustomRejectedExecutionHandler - Task rejected by custom rejection handler: java.util.concurrent.FutureTask@512ddf17
20:09:14.990 [pool-1-thread-3] INFO ...CustomRejectedExecutionHandler - Task pool-1-thread-3 is executing by thread
20:09:14.993 [main] ERROR ...CustomRejectedExecutionHandler - Task rejected by custom rejection handler: java.util.concurrent.FutureTask@3b192d32
20:09:14.993 [main] ERROR ...CustomRejectedExecutionHandler - Task rejected by custom rejection handler: java.util.concurrent.FutureTask@16f65612
20:09:14.993 [main] ERROR ...CustomRejectedExecutionHandler - Task rejected by custom rejection handler: java.util.concurrent.FutureTask@311d617d
20:09:14.990 [pool-1-thread-2] INFO ...CustomRejectedExecutionHandler - Task pool-1-thread-2 is executing by thread
20:09:14.990 [pool-1-thread-4] INFO ...CustomRejectedExecutionHandler - Task pool-1-thread-4 is executing by thread
20:09:14.990 [pool-1-thread-1] INFO ...CustomRejectedExecutionHandler - Task pool-1-thread-1 is executing by thread
20:09:15.104 [pool-1-thread-4] INFO ...CustomRejectedExecutionHandler - Task pool-1-thread-4 is executing by thread
20:09:15.104 [pool-1-thread-3] INFO ...CustomRejectedExecutionHandler - Task pool-1-thread-3 is executing by thread
以上代码,拒绝4次,成功执行6次