主题
J.U.C
AQS
Java并发库(J.U.C,即java.util.concurrent)中的AbstractQueuedSynchronizer(AQS)是一个用于构建锁和同步器框架的抽象类。
AQS提供了线程阻塞、等待队列以及状态管理等基础功能,使得开发者能够基于它实现自定义的线程同步工具。
AQS的核心特性包括:
状态管理:通过一个volatile int类型的成员变量表示同步状态,如锁的状态,可以进行原子性的CAS操作来改变这个状态。
FIFO等待队列:维护了一个FIFO的双向链表作为等待队列,当线程尝试获取同步状态失败时,会被封装成Node节点加入到队列中,按照先来后到的原则排队等待资源。
核心方法:
acquire(int arg)
:用来获取同步状态,如果当前状态不可用则会阻塞线程。release(int arg)
:用来释放同步状态,并唤醒等待队列中的其他线程。tryAcquire(int arg)
和tryRelease(int arg)
:这两个是需要子类重写的方法,分别定义了如何尝试获取和释放同步状态的具体逻辑。
自旋与阻塞结合:AQS支持自旋等待机制,在某些情况下允许线程在未获得锁之前短时间循环尝试(而非立即进入阻塞),以提高并发性能。
可扩展性:开发者可以通过继承AQS并实现其抽象方法来创建自己的同步组件,比如ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等都基于AQS实现。
AQS是Java并发包中提供的一种底层同步框架,为高级并发构造提供了基础设施支持,简化了编写高效且线程安全代码的工作。
使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架
利用了一个int类型表示状态
使用方法是继承
子类通过继承并通过实现它的方法管理其状态{acquire 和release}的方法操纵状态
可以同时实现排它锁和共享锁模式(独占、共享)
常见同步工具类
同步工具 | 特点概述 |
---|---|
CountDownLatch | 用于等待一组线程执行完特定数量的操作后继续执行,计数器递减到零时唤醒等待中的线程。 |
Semaphore | 用于控制同时访问的线程数量,允许多个线程同时访问临界区,但要限制其并发数目,适用于资源有限的情况。 |
CyclicBarrier | 用于多线程间的同步,所有线程在达到屏障点后等待,直到所有线程都到达后一起继续执行。 |
ReentrantLock | 可重入锁,支持独占锁和公平/非公平获取锁,允许同一线程重复获取锁,适用于实现更复杂的同步控制需求。 |
这些是常见的 Java 同步工具,每个工具都有其特定的用途和特点,用于支持不同类型的线程同步和并发控制。
CountDownLatch
用于等待一个或者一组线程完成操作,以便进行下一个操作。
具体应用,可参看并发模拟代码
内部是由一个基于AQS实现的同步器Sync构成,countDown函数用来递减计数,当为0后,则唤醒所有等待的线程,await函数式在计数器为0前一直等待,也可以指定时间实现超时等待,getCount函数可以获取当前计数。
某一线程在开始运行前等待n个线程执行完毕。 将 CountDownLatch 的计数器初始化为n, 每当一个任务线程执行完毕,就将计数器减1(countdownlatch.countDown()),当计数器的值变为0时,在CountDownLatch上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
实现多个线程开始执行任务的最大并行性。 注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。首先初始化一个共享的 CountDownLatch 对象,将其计数器初始化为 1(new CountDownLatch(1) ),多个线程在开始执行任务前首先coundownlatch.await(),当主线程调用 countDown() 时,计数器变为0,多个线程同时被唤醒。
死锁检测:可以使用n个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁。
缺点: CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。
- 计数器 = 0 唤醒(await)
- 超时等待
常用代码
java
public class CountDownLatchExample {
private static final CountDownLatch latch = new CountDownLatch(4);
private static int data;
public static void main(String[] args) throws InterruptedException {
Thread workerThread = new Thread() {
@Override
public void run() {
for (int i = 1; i < 10; i++) {
data = i;
latch.countDown();
// 使当前线程暂停(随机)一段时间
Tools.randomPause(1000);
}
};
};
workerThread.start();
latch.await();
Debug.info("It's done. data=%d", data);
}
}
超时结束
java
public class CountDownLatchExample2 {
private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await(10, TimeUnit.MILLISECONDS);// 超时结束
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
}
}
Semaphore
常用于控制并发线程的数量
3个信号量,每次只允许3个线程执行
java
public class SemaphoreExample1 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire(); // 获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
输出的日志
xml
16:24:31.066 [pool-1-thread-2] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 1
16:24:31.066 [pool-1-thread-3] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 2
16:24:31.066 [pool-1-thread-4] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 3
16:24:32.072 [pool-1-thread-1] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 0
16:24:32.073 [pool-1-thread-5] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 4
16:24:32.073 [pool-1-thread-10] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 9
16:24:33.081 [pool-1-thread-7] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 6
16:24:33.081 [pool-1-thread-8] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 7
16:24:33.081 [pool-1-thread-9] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 8
16:24:34.092 [pool-1-thread-6] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 5
16:24:34.092 [pool-1-thread-11] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 10
16:24:34.092 [pool-1-thread-12] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 11
16:24:35.106 [pool-1-thread-13] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 12
16:24:35.106 [pool-1-thread-14] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 13
16:24:35.106 [pool-1-thread-15] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 14
16:24:36.120 [pool-1-thread-16] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 15
16:24:36.120 [pool-1-thread-17] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 16
16:24:36.120 [pool-1-thread-19] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 18
16:24:37.130 [pool-1-thread-20] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 19
16:24:37.130 [pool-1-thread-18] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 17
可知,每隔1秒有打印3条记录
一次性获取3个许可和释放3个许可
java
public class SemaphoreExample2 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire(3); // 获取多个许可
test(threadNum);
semaphore.release(3); // 释放多个许可
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
输出
java
16:36:23.135 [pool-1-thread-1] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 0
16:36:24.150 [pool-1-thread-2] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 1
16:36:25.161 [pool-1-thread-4] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 3
16:36:26.175 [pool-1-thread-5] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 4
16:36:27.175 [pool-1-thread-3] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 2
16:36:28.187 [pool-1-thread-6] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 5
16:36:29.203 [pool-1-thread-7] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 6
16:36:30.204 [pool-1-thread-8] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 7
16:36:31.217 [pool-1-thread-9] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 8
16:36:32.227 [pool-1-thread-10] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 9
16:36:33.241 [pool-1-thread-11] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 10
16:36:34.242 [pool-1-thread-14] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 13
16:36:35.253 [pool-1-thread-13] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 12
16:36:36.269 [pool-1-thread-12] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 11
16:36:37.281 [pool-1-thread-15] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 14
16:36:38.295 [pool-1-thread-16] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 15
16:36:39.309 [pool-1-thread-17] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 16
16:36:40.312 [pool-1-thread-19] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 18
16:36:41.327 [pool-1-thread-18] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 17
16:36:42.341 [pool-1-thread-20] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 19
从打印可知,每秒执行一次。即共有3个许可,每次获取3个,此案例单线程执行。
尝试获取许可(tryAcquire)
java
public class SemaphoreExample3 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
if (semaphore.tryAcquire()) { // 尝试获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
}
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
输出
16:35:09.409 [pool-1-thread-6] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample3 - 5
16:35:09.409 [pool-1-thread-1] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample3 - 0
16:35:09.409 [pool-1-thread-2] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample3 - 1
可知,只执行了3次
JDK源码
java
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
只执行3秒
java
public class SemaphoreExample4 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
if (semaphore.tryAcquire(3000, TimeUnit.MILLISECONDS)) { // 尝试获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
}
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
输出
java
16:43:39.472 [pool-1-thread-3] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample4 - 2
16:43:39.472 [pool-1-thread-4] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample4 - 3
16:43:39.472 [pool-1-thread-1] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample4 - 0
16:43:40.478 [pool-1-thread-2] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample4 - 1
16:43:40.478 [pool-1-thread-5] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample4 - 4
16:43:40.478 [pool-1-thread-6] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample4 - 5
16:43:41.490 [pool-1-thread-7] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample4 - 6
16:43:41.490 [pool-1-thread-8] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample4 - 7
16:43:41.490 [pool-1-thread-9] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample4 - 8
从日志可知,只运行了3秒。
JDK源码
java
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
CyclicBarrier
是可循环使用的屏障
用来让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
每个线程调用await方式时就说明已经到达了屏障,然后当前线程被阻塞。
CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的应用场景。
每次5个线程同时执行
java
public class CyclicBarrierExample1 {
private static CyclicBarrier barrier = new CyclicBarrier(5);// 每次5个线程同步
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
输出
java
16:47:27.601 [pool-1-thread-1] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 0 is ready
16:47:28.610 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 1 is ready
16:47:29.623 [pool-1-thread-3] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 2 is ready
16:47:30.637 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 3 is ready
16:47:31.645 [pool-1-thread-5] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 4 is ready
16:47:31.645 [pool-1-thread-5] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 4 continue
16:47:31.645 [pool-1-thread-1] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 0 continue
16:47:31.645 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 1 continue
16:47:31.645 [pool-1-thread-3] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 2 continue
16:47:31.645 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 3 continue
16:47:32.654 [pool-1-thread-6] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 5 is ready
16:47:33.666 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 6 is ready
16:47:34.676 [pool-1-thread-1] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 7 is ready
16:47:35.690 [pool-1-thread-3] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 8 is ready
16:47:36.701 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 9 is ready
16:47:36.701 [pool-1-thread-6] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 5 continue
16:47:36.701 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 6 continue
16:47:36.701 [pool-1-thread-3] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 8 continue
16:47:36.701 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 9 continue
16:47:36.702 [pool-1-thread-1] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 7 continue
可知每隔1秒输出5条记录
超时等待
java
public class CyclicBarrierExample2 {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
try {
barrier.await(2000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn("BarrierException", e);
}
log.info("{} continue", threadNum);
}
}
输出
java
16:59:06.582 [pool-1-thread-1] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 0 is ready
16:59:07.583 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 1 is ready
16:59:08.594 [pool-1-thread-3] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 2 is ready
16:59:08.596 [pool-1-thread-1] WARN cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - BarrierException
java.util.concurrent.TimeoutException: null
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:257)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.race(CyclicBarrierExample2.java:36)
at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.lambda$main$0(CyclicBarrierExample2.java:23)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
16:59:08.596 [pool-1-thread-2] WARN cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - BarrierException
java.util.concurrent.BrokenBarrierException: null
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.race(CyclicBarrierExample2.java:36)
at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.lambda$main$0(CyclicBarrierExample2.java:23)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
16:59:08.596 [pool-1-thread-3] WARN cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - BarrierException
java.util.concurrent.BrokenBarrierException: null
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.race(CyclicBarrierExample2.java:36)
at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.lambda$main$0(CyclicBarrierExample2.java:23)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
16:59:08.597 [pool-1-thread-3] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 2 continue
16:59:08.597 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 1 continue
16:59:08.597 [pool-1-thread-1] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 0 continue
16:59:09.597 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 3 is ready
16:59:09.597 [pool-1-thread-4] WARN cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - BarrierException
java.util.concurrent.BrokenBarrierException: null
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.race(CyclicBarrierExample2.java:36)
at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.lambda$main$0(CyclicBarrierExample2.java:23)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
16:59:09.597 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 3 continue
16:59:10.609 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 4 is ready
16:59:10.609 [pool-1-thread-2] WARN cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - BarrierException
java.util.concurrent.BrokenBarrierException: null
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.race(CyclicBarrierExample2.java:36)
at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.lambda$main$0(CyclicBarrierExample2.java:23)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
16:59:10.609 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 4 continue
16:59:11.612 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 5 is ready
16:59:11.612 [pool-1-thread-4] WARN cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - BarrierException
java.util.concurrent.BrokenBarrierException: null
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.race(CyclicBarrierExample2.java:36)
at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.lambda$main$0(CyclicBarrierExample2.java:23)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
16:59:11.612 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 5 continue
16:59:12.626 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 6 is ready
16:59:12.626 [pool-1-thread-2] WARN cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - BarrierException
java.util.concurrent.BrokenBarrierException: null
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.race(CyclicBarrierExample2.java:36)
at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.lambda$main$0(CyclicBarrierExample2.java:23)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
16:59:12.626 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 6 continue
16:59:13.641 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 7 is ready
16:59:13.641 [pool-1-thread-4] WARN cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - BarrierException
java.util.concurrent.BrokenBarrierException: null
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.race(CyclicBarrierExample2.java:36)
at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.lambda$main$0(CyclicBarrierExample2.java:23)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
16:59:13.641 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 7 continue
16:59:14.653 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 8 is ready
16:59:14.653 [pool-1-thread-2] WARN cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - BarrierException
java.util.concurrent.BrokenBarrierException: null
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.race(CyclicBarrierExample2.java:36)
at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.lambda$main$0(CyclicBarrierExample2.java:23)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
16:59:14.653 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 8 continue
16:59:15.660 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 9 is ready
16:59:15.660 [pool-1-thread-4] WARN cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - BarrierException
java.util.concurrent.BrokenBarrierException: null
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.race(CyclicBarrierExample2.java:36)
at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.lambda$main$0(CyclicBarrierExample2.java:23)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
16:59:15.661 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 9 continue
需要捕获BrokenBarrierException
和TimeoutException
异常,避免中断执行
使用Runnable
java
public class CyclicBarrierExample3 {
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
log.info("callback is running");
});
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
输出
java
17:03:22.695 [pool-1-thread-1] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 0 is ready
17:03:23.706 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 1 is ready
17:03:24.720 [pool-1-thread-3] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 2 is ready
17:03:25.733 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 3 is ready
17:03:26.748 [pool-1-thread-5] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 4 is ready
17:03:26.748 [pool-1-thread-5] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - callback is running
17:03:26.748 [pool-1-thread-5] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 4 continue
17:03:26.748 [pool-1-thread-1] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 0 continue
17:03:26.748 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 1 continue
17:03:26.748 [pool-1-thread-3] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 2 continue
17:03:26.748 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 3 continue
17:03:27.761 [pool-1-thread-6] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 5 is ready
17:03:28.777 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 6 is ready
17:03:29.778 [pool-1-thread-3] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 7 is ready
17:03:30.790 [pool-1-thread-5] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 8 is ready
17:03:31.792 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 9 is ready
17:03:31.792 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - callback is running
17:03:31.792 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 9 continue
17:03:31.792 [pool-1-thread-6] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 5 continue
17:03:31.792 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 6 continue
17:03:31.792 [pool-1-thread-3] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 7 continue
17:03:31.792 [pool-1-thread-5] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 8 continue
可知,每ready完成都会执行callback
与CountDownLatch的区别
CyclicBarriar可以实现让一组线程等待某个状态后再全部同时执行。可以重复使用,但CountDownLatch不可以。
对于CountDownLatch来说,重点是“一个线程(多个线程)等待”,而其他的N个线程在完成“某件事情”之后,可以终止,也可以等待。
而对于CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。
- CountDownLatch是计数器,只能使用一次,而CyclicBarrier的计数器提供reset功能,可以多次使用。
- CountDownLatch是一个或者多个线程,等待其他多个线程完成某件事情之后才能执行。CyclicBarrier 是 多个线程互相等待,直到到达同一个同步点,再继续一起执行。对于CountDownLatch来说,重点是“一个线程(多个线程)等待”,而其他的N个线程在完成“某件事情”之后,可以终止,也可以等待。而对于CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。
- CountDownLatch是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而CyclicBarrier更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。
ReentrantLock
使用方法
java
public class ReentrantLockDemo {
// 1
private final Lock lock = new ReentrantLock();
public void method1() {
// 2
lock.lock();
try {
doSomething();
} finally {
// 3
lock.unlock();
}
}
private void doSomething() {
log.info("访问共享数据");
}
public static void main(String[] args) {
ReentrantLockDemo demo = new ReentrantLockDemo();
demo.method1();
}
}
lock
获取锁,unlock
释放锁
与synchronized的区别
特性 | ReentrantLock | synchronized |
---|---|---|
类型 | 类(java.util.concurrent.locks.ReentrantLock ) | 关键字 |
锁获取和释放 | 必须显式调用 lock() 和 unlock() 方法进行获取和释放锁 | 隐式获取和释放锁,进入和退出 synchronized 块时自动获取和释放锁 |
中断支持 | 支持中断,lockInterruptibly() 方法可以响应中断 | 不支持中断,进入 synchronized 块时无法响应中断 |
公平性 | 可以选择公平或非公平模式 | 非公平锁 |
条件变量 | 支持 Condition 条件变量,可以使用 newCondition() 方法创建 | 不支持条件变量 |
性能 | 竞争激烈时性能可能优于 synchronized | 竞争不激烈时性能可能优于 ReentrantLock ,但在高并发场景下可能性能稍逊一筹 |
可重入性 | 支持可重入,同一个线程可以重复获取锁 | 支持可重入,同一个线程可以重复进入同一个 synchronized 块 |
适用范围 | 适用于需要更灵活的锁控制、公平性选择、中断支持和条件变量的情况 | 适用于简单的同步代码块 |
ReentrantReadWriteLock
代码示例
java
public class LockExample3 {
private final Map<String, Data> map = new TreeMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public Data get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
public Set<String> getAllKeys() {
readLock.lock();
try {
return map.keySet();
} finally {
readLock.unlock();
}
}
public Data put(String key, Data value) {
writeLock.lock();
try {
return map.put(key, value);
} finally {
writeLock.unlock();
}
}
class Data {
}
}
测试用例
java
import java.util.Set;
import junit.framework.TestCase;
import org.junit.Before;
import org.junit.Test;
public class LockExample3Test extends TestCase {
private LockExample3 lockExample3;
@Before
public void setUp() {
lockExample3 = new LockExample3();
}
@Test
public void testPutAndGet() {
String key = "testKey";
LockExample3.Data testData = lockExample3.new Data();
assertNull(lockExample3.get(key));
lockExample3.put(key, testData);
assertEquals(testData, lockExample3.get(key));
}
@Test
public void testGetAllKeys() {
String key1 = "key1";
String key2 = "key2";
String key3 = "key3";
LockExample3.Data data1 = lockExample3.new Data();
LockExample3.Data data2 = lockExample3.new Data();
LockExample3.Data data3 = lockExample3.new Data();
lockExample3.put(key1, data1);
lockExample3.put(key2, data2);
lockExample3.put(key3, data3);
Set<String> keys = lockExample3.getAllKeys();
assertEquals(3, keys.size());
assertEquals(true, keys.contains(key1));
assertEquals(true, keys.contains(key2));
assertEquals(true, keys.contains(key3));
}
}
StampedLock
使用方法
java
public class LockExample7 implements Runnable{
public static int count = 0;
private final static StampedLock lock = new StampedLock();
public static void main(String[] args) throws Exception {
LockExample7 demo = new LockExample7();
ConcurrentExecutor concurrentExecutor = new ConcurrentExecutor(5000,200,demo);
concurrentExecutor.execute();
log.info("count = {}",count);
}
private void add() {
long stamp = lock.writeLock();
try {
count++;
} finally {
lock.unlock(stamp);
}
}
@Override
public void run() {
add();
}
}
Condition
演示了使用 ReentrantLock
和 Condition
来实现线程间的等待和通知机制
java
public class LockExample6 {
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();
new Thread(() -> {
try {
reentrantLock.lock();
log.info("{} wait signal",Thread.currentThread().getName()); // 1
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("{} get signal",Thread.currentThread().getName()); // 4
reentrantLock.unlock();
}).start();
new Thread(() -> {
reentrantLock.lock();
log.info("{} get lock",Thread.currentThread().getName()); // 2
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
condition.signalAll();
log.info("{} send signal! ",Thread.currentThread().getName()); // 3
reentrantLock.unlock();
}).start();
}
}
输出
java
17:44:31.555 [Thread-0] INFO cn.diyai.mul_thread.lock.LockExample6 - Thread-0 wait signal
17:44:31.557 [Thread-1] INFO cn.diyai.mul_thread.lock.LockExample6 - Thread-1 get lock
17:44:34.563 [Thread-1] INFO cn.diyai.mul_thread.lock.LockExample6 - Thread-1 send signal!
17:44:34.563 [Thread-0] INFO cn.diyai.mul_thread.lock.LockExample6 - Thread-0 get signal
一个线程等待其他线程发送信号,而另一个线程发送信号通知等待中的线程
FutureTask
Future的用法
java
public class FutureExample {
static class MyCallable implements Callable<String> { // 1
@Override
public String call() throws Exception {
log.info("do something in callable");
Thread.sleep(3000);
return "Done";
}
}
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
Future<String> future = executorService.submit(new MyCallable());
log.info("do something in main");
Thread.sleep(1000);
String result = future.get();// 2
log.info("result:{}", result);
executorService.shutdown();
}
}
输出
java
18:00:57.973 [main] INFO cn.diyai.mul_thread.aqs.future.FutureExample - do something in main
18:00:57.973 [pool-1-thread-1] INFO cn.diyai.mul_thread.aqs.future.FutureExample - do something in callable
18:01:00.975 [main] INFO cn.diyai.mul_thread.aqs.future.FutureExample - result:Done
FutureTask的用法
java
public class FutureTaskExample {
public static void main(String[] args) throws Exception {
FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
}
});
new Thread(futureTask).start();
log.info("do something in main");
Thread.sleep(1000);
String result = futureTask.get();
log.info("result:{}", result);
}
}
输出
java
18:02:12.776 [main] INFO cn.diyai.mul_thread.aqs.future.FutureTaskExample - do something in main
18:02:12.776 [Thread-0] INFO cn.diyai.mul_thread.aqs.future.FutureTaskExample - do something in callable
18:02:15.788 [main] INFO cn.diyai.mul_thread.aqs.future.FutureTaskExample - result:Done
ForkJoin
基于工作窃取算法
java
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
public static final int threshold = 2;
private int start;
private int end;
public ForkJoinTaskExample(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
//如果任务足够小就计算任务
boolean canCompute = (end - start) <= threshold;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
// 执行子任务
leftTask.fork();
rightTask.fork();
// 等待任务执行结束合并其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合并子任务
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkjoinPool = new ForkJoinPool();
//生成一个计算任务,计算1+2+3+4...100
ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
//执行一个任务
Future<Integer> result = forkjoinPool.submit(task);
try {
log.info("result:{}", result.get());
} catch (Exception e) {
log.error("exception", e);
}
}
}
输出
java
18:05:07.554 [main] INFO cn.diyai.mul_thread.aqs.ForkJoinTaskExample - result:5050
BlockingQueue
常用方法
方法 | 含义概述 |
---|---|
boolean add(E e) | 将指定的元素插入此队列中(如果立即可行且不会违反容量限制),成功返回 true,否则抛出 IllegalStateException 异常。 |
boolean offer(E e) | 将指定的元素插入此队列中(如果立即可行且不会违反容量限制),成功返回 true,否则返回 false。 |
void put(E e) | 将指定的元素插入此队列中,如果队列已满,则阻塞等待直到有空间可用。 |
boolean offer(E e, long timeout, TimeUnit unit) | 将指定的元素插入此队列中(如果立即可行且不会违反容量限制),成功返回 true,否则阻塞等待指定时间,超时返回 false。 |
E remove() | 检索并删除此队列的头部,如果队列为空,则抛出 NoSuchElementException 异常。 |
E poll() | 检索并删除此队列的头部,如果队列为空,则返回 null。 |
E take() | 检索并删除此队列的头部,如果队列为空,则阻塞等待直到有元素可用。 |
E poll(long timeout, TimeUnit unit) | 检索并删除此队列的头部,如果队列为空,则阻塞等待指定时间,超时返回 null。 |
E element() | 检索但不删除此队列的头部,如果队列为空,则抛出 NoSuchElementException 异常。 |
E peek() | 检索但不删除此队列的头部,如果队列为空,则返回 null。 |
常见的阻塞队列
队列类型 | 特点概述 |
---|---|
ArrayBlockingQueue | 基于数组实现的有界阻塞队列,先进先出(FIFO),支持指定容量,插入元素时若队列已满则阻塞,删除元素时若队列为空则阻塞。 |
DelayQueue | 延迟队列,元素按照延迟时间的顺序进行消费,实现了 Delayed 接口的元素会按照其设定的延迟时间进行排序,支持延迟获取元素的操作。 |
LinkedBlockingQueue | 基于链表实现的可选有界或无界阻塞队列,先进先出(FIFO),支持指定容量,插入元素时若队列已满则阻塞,删除元素时若队列为空则阻塞。 |
PriorityBlockingQueue | 优先级阻塞队列,元素根据自然顺序或者自定义的比较器进行排序,插入元素时按照优先级进行排序,支持按优先级获取元素的操作,是无界队列。 |
SynchronousQueue | 同步队列,不存储元素,用于直接将生产者线程产生的元素传递给消费者线程,插入操作需要等待消费者取走元素,反之亦然,是一个没有存储元素的阻塞队列,每次插入/删除操作都必须有对应的消费者/生产者线程。 |