Skip to content

J.U.C

AQS

Java并发库(J.U.C,即java.util.concurrent)中的AbstractQueuedSynchronizer(AQS)是一个用于构建锁和同步器框架的抽象类。

AQS提供了线程阻塞、等待队列以及状态管理等基础功能,使得开发者能够基于它实现自定义的线程同步工具。

AQS的核心特性包括:

  1. 状态管理:通过一个volatile int类型的成员变量表示同步状态,如锁的状态,可以进行原子性的CAS操作来改变这个状态。

  2. FIFO等待队列:维护了一个FIFO的双向链表作为等待队列,当线程尝试获取同步状态失败时,会被封装成Node节点加入到队列中,按照先来后到的原则排队等待资源。

  3. 核心方法

    • acquire(int arg):用来获取同步状态,如果当前状态不可用则会阻塞线程。
    • release(int arg):用来释放同步状态,并唤醒等待队列中的其他线程。
    • tryAcquire(int arg)tryRelease(int arg):这两个是需要子类重写的方法,分别定义了如何尝试获取和释放同步状态的具体逻辑。
  4. 自旋与阻塞结合:AQS支持自旋等待机制,在某些情况下允许线程在未获得锁之前短时间循环尝试(而非立即进入阻塞),以提高并发性能。

  5. 可扩展性:开发者可以通过继承AQS并实现其抽象方法来创建自己的同步组件,比如ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等都基于AQS实现。

AQS是Java并发包中提供的一种底层同步框架,为高级并发构造提供了基础设施支持,简化了编写高效且线程安全代码的工作。

使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架

利用了一个int类型表示状态

使用方法是继承

子类通过继承并通过实现它的方法管理其状态{acquire 和release}的方法操纵状态

可以同时实现排它锁和共享锁模式(独占、共享)

常见同步工具类

同步工具特点概述
CountDownLatch用于等待一组线程执行完特定数量的操作后继续执行,计数器递减到零时唤醒等待中的线程。
Semaphore用于控制同时访问的线程数量,允许多个线程同时访问临界区,但要限制其并发数目,适用于资源有限的情况。
CyclicBarrier用于多线程间的同步,所有线程在达到屏障点后等待,直到所有线程都到达后一起继续执行。
ReentrantLock可重入锁,支持独占锁和公平/非公平获取锁,允许同一线程重复获取锁,适用于实现更复杂的同步控制需求。

这些是常见的 Java 同步工具,每个工具都有其特定的用途和特点,用于支持不同类型的线程同步和并发控制。

CountDownLatch

用于等待一个或者一组线程完成操作,以便进行下一个操作。

具体应用,可参看并发模拟代码

内部是由一个基于AQS实现的同步器Sync构成,countDown函数用来递减计数,当为0后,则唤醒所有等待的线程,await函数式在计数器为0前一直等待,也可以指定时间实现超时等待,getCount函数可以获取当前计数。

  1. 某一线程在开始运行前等待n个线程执行完毕。 将 CountDownLatch 的计数器初始化为n, 每当一个任务线程执行完毕,就将计数器减1(countdownlatch.countDown()),当计数器的值变为0时,在CountDownLatch上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。

  2. 实现多个线程开始执行任务的最大并行性。 注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。首先初始化一个共享的 CountDownLatch 对象,将其计数器初始化为 1(new CountDownLatch(1) ),多个线程在开始执行任务前首先coundownlatch.await(),当主线程调用 countDown() 时,计数器变为0,多个线程同时被唤醒。

  3. 死锁检测:可以使用n个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁。

缺点: CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。

  1. 计数器 = 0 唤醒(await)
  2. 超时等待

常用代码

java
public class CountDownLatchExample {
  private static final CountDownLatch latch = new CountDownLatch(4);
  private static int data;

  public static void main(String[] args) throws InterruptedException {
    Thread workerThread = new Thread() {
      @Override
      public void run() {
        for (int i = 1; i < 10; i++) {
          data = i;
          latch.countDown();
          // 使当前线程暂停(随机)一段时间
          Tools.randomPause(1000);
        }

      };
    };
    workerThread.start();
    latch.await();
    Debug.info("It's done. data=%d", data);
  }
}

超时结束

java
public class CountDownLatchExample2 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();
        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    test(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await(10, TimeUnit.MILLISECONDS);// 超时结束
        log.info("finish");
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        Thread.sleep(100);
        log.info("{}", threadNum);
    }
}

Semaphore

常用于控制并发线程的数量

3个信号量,每次只允许3个线程执行

java
public class SemaphoreExample1 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    semaphore.acquire(); // 获取一个许可
                    test(threadNum);
                    semaphore.release(); // 释放一个许可
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}

输出的日志

xml
16:24:31.066 [pool-1-thread-2] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 1
16:24:31.066 [pool-1-thread-3] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 2
16:24:31.066 [pool-1-thread-4] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 3

16:24:32.072 [pool-1-thread-1] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 0
16:24:32.073 [pool-1-thread-5] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 4
16:24:32.073 [pool-1-thread-10] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 9

16:24:33.081 [pool-1-thread-7] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 6
16:24:33.081 [pool-1-thread-8] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 7
16:24:33.081 [pool-1-thread-9] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 8

16:24:34.092 [pool-1-thread-6] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 5
16:24:34.092 [pool-1-thread-11] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 10
16:24:34.092 [pool-1-thread-12] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 11

16:24:35.106 [pool-1-thread-13] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 12
16:24:35.106 [pool-1-thread-14] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 13
16:24:35.106 [pool-1-thread-15] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 14

16:24:36.120 [pool-1-thread-16] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 15
16:24:36.120 [pool-1-thread-17] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 16
16:24:36.120 [pool-1-thread-19] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 18

16:24:37.130 [pool-1-thread-20] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 19
16:24:37.130 [pool-1-thread-18] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample1 - 17

可知,每隔1秒有打印3条记录

一次性获取3个许可和释放3个许可

java
public class SemaphoreExample2 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    semaphore.acquire(3); // 获取多个许可
                    test(threadNum);
                    semaphore.release(3); // 释放多个许可
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}

输出

java
16:36:23.135 [pool-1-thread-1] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 0
16:36:24.150 [pool-1-thread-2] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 1
16:36:25.161 [pool-1-thread-4] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 3
16:36:26.175 [pool-1-thread-5] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 4
16:36:27.175 [pool-1-thread-3] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 2
16:36:28.187 [pool-1-thread-6] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 5
16:36:29.203 [pool-1-thread-7] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 6
16:36:30.204 [pool-1-thread-8] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 7
16:36:31.217 [pool-1-thread-9] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 8
16:36:32.227 [pool-1-thread-10] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 9
16:36:33.241 [pool-1-thread-11] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 10
16:36:34.242 [pool-1-thread-14] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 13
16:36:35.253 [pool-1-thread-13] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 12
16:36:36.269 [pool-1-thread-12] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 11
16:36:37.281 [pool-1-thread-15] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 14
16:36:38.295 [pool-1-thread-16] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 15
16:36:39.309 [pool-1-thread-17] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 16
16:36:40.312 [pool-1-thread-19] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 18
16:36:41.327 [pool-1-thread-18] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 17
16:36:42.341 [pool-1-thread-20] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample2 - 19

从打印可知,每秒执行一次。即共有3个许可,每次获取3个,此案例单线程执行

尝试获取许可(tryAcquire)

java
public class SemaphoreExample3 {

    private final static int threadCount = 20;
    
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    if (semaphore.tryAcquire()) { // 尝试获取一个许可
                        test(threadNum);
                        semaphore.release(); // 释放一个许可
                    }
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}

输出

16:35:09.409 [pool-1-thread-6] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample3 - 5
16:35:09.409 [pool-1-thread-1] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample3 - 0
16:35:09.409 [pool-1-thread-2] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample3 - 1

可知,只执行了3次

JDK源码

java
public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

只执行3秒

java
public class SemaphoreExample4 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    if (semaphore.tryAcquire(3000, TimeUnit.MILLISECONDS)) { // 尝试获取一个许可
                        test(threadNum);
                        semaphore.release(); // 释放一个许可
                    }
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}

输出

java
16:43:39.472 [pool-1-thread-3] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample4 - 2
16:43:39.472 [pool-1-thread-4] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample4 - 3
16:43:39.472 [pool-1-thread-1] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample4 - 0
    
16:43:40.478 [pool-1-thread-2] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample4 - 1
16:43:40.478 [pool-1-thread-5] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample4 - 4
16:43:40.478 [pool-1-thread-6] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample4 - 5
    
16:43:41.490 [pool-1-thread-7] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample4 - 6
16:43:41.490 [pool-1-thread-8] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample4 - 7
16:43:41.490 [pool-1-thread-9] INFO cn.diyai.mul_thread.aqs.semaphore.SemaphoreExample4 - 8

从日志可知,只运行了3秒。

JDK源码

java
public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}

CyclicBarrier

可循环使用的屏障

用来让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。

每个线程调用await方式时就说明已经到达了屏障,然后当前线程被阻塞。

CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的应用场景。

每次5个线程同时执行

java
public class CyclicBarrierExample1 {

    private static CyclicBarrier barrier = new CyclicBarrier(5);// 每次5个线程同步

    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        barrier.await();
        log.info("{} continue", threadNum);
    }
}

输出

java
16:47:27.601 [pool-1-thread-1] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 0 is ready
16:47:28.610 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 1 is ready
16:47:29.623 [pool-1-thread-3] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 2 is ready
16:47:30.637 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 3 is ready
16:47:31.645 [pool-1-thread-5] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 4 is ready
16:47:31.645 [pool-1-thread-5] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 4 continue
16:47:31.645 [pool-1-thread-1] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 0 continue
16:47:31.645 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 1 continue
16:47:31.645 [pool-1-thread-3] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 2 continue
16:47:31.645 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 3 continue
16:47:32.654 [pool-1-thread-6] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 5 is ready
16:47:33.666 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 6 is ready
16:47:34.676 [pool-1-thread-1] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 7 is ready
16:47:35.690 [pool-1-thread-3] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 8 is ready
16:47:36.701 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 9 is ready
16:47:36.701 [pool-1-thread-6] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 5 continue
16:47:36.701 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 6 continue
16:47:36.701 [pool-1-thread-3] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 8 continue
16:47:36.701 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 9 continue
16:47:36.702 [pool-1-thread-1] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample1 - 7 continue

可知每隔1秒输出5条记录

超时等待

java
public class CyclicBarrierExample2 {
    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        try {
            barrier.await(2000, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            log.warn("BarrierException", e);
        }
        log.info("{} continue", threadNum);
    }
}

输出

java
16:59:06.582 [pool-1-thread-1] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 0 is ready
16:59:07.583 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 1 is ready
16:59:08.594 [pool-1-thread-3] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 2 is ready
16:59:08.596 [pool-1-thread-1] WARN cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - BarrierException
java.util.concurrent.TimeoutException: null
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:257)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
	at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.race(CyclicBarrierExample2.java:36)
	at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.lambda$main$0(CyclicBarrierExample2.java:23)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
16:59:08.596 [pool-1-thread-2] WARN cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - BarrierException
java.util.concurrent.BrokenBarrierException: null
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
	at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.race(CyclicBarrierExample2.java:36)
	at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.lambda$main$0(CyclicBarrierExample2.java:23)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
16:59:08.596 [pool-1-thread-3] WARN cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - BarrierException
java.util.concurrent.BrokenBarrierException: null
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
	at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.race(CyclicBarrierExample2.java:36)
	at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.lambda$main$0(CyclicBarrierExample2.java:23)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
16:59:08.597 [pool-1-thread-3] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 2 continue
16:59:08.597 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 1 continue
16:59:08.597 [pool-1-thread-1] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 0 continue
16:59:09.597 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 3 is ready
16:59:09.597 [pool-1-thread-4] WARN cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - BarrierException
java.util.concurrent.BrokenBarrierException: null
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
	at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.race(CyclicBarrierExample2.java:36)
	at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.lambda$main$0(CyclicBarrierExample2.java:23)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
16:59:09.597 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 3 continue
16:59:10.609 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 4 is ready
16:59:10.609 [pool-1-thread-2] WARN cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - BarrierException
java.util.concurrent.BrokenBarrierException: null
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
	at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.race(CyclicBarrierExample2.java:36)
	at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.lambda$main$0(CyclicBarrierExample2.java:23)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
16:59:10.609 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 4 continue
16:59:11.612 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 5 is ready
16:59:11.612 [pool-1-thread-4] WARN cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - BarrierException
java.util.concurrent.BrokenBarrierException: null
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
	at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.race(CyclicBarrierExample2.java:36)
	at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.lambda$main$0(CyclicBarrierExample2.java:23)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
16:59:11.612 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 5 continue
16:59:12.626 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 6 is ready
16:59:12.626 [pool-1-thread-2] WARN cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - BarrierException
java.util.concurrent.BrokenBarrierException: null
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
	at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.race(CyclicBarrierExample2.java:36)
	at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.lambda$main$0(CyclicBarrierExample2.java:23)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
16:59:12.626 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 6 continue
16:59:13.641 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 7 is ready
16:59:13.641 [pool-1-thread-4] WARN cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - BarrierException
java.util.concurrent.BrokenBarrierException: null
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
	at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.race(CyclicBarrierExample2.java:36)
	at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.lambda$main$0(CyclicBarrierExample2.java:23)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
16:59:13.641 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 7 continue
16:59:14.653 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 8 is ready
16:59:14.653 [pool-1-thread-2] WARN cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - BarrierException
java.util.concurrent.BrokenBarrierException: null
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
	at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.race(CyclicBarrierExample2.java:36)
	at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.lambda$main$0(CyclicBarrierExample2.java:23)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
16:59:14.653 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 8 continue
16:59:15.660 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 9 is ready
16:59:15.660 [pool-1-thread-4] WARN cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - BarrierException
java.util.concurrent.BrokenBarrierException: null
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
	at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.race(CyclicBarrierExample2.java:36)
	at cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2.lambda$main$0(CyclicBarrierExample2.java:23)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
16:59:15.661 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample2 - 9 continue

需要捕获BrokenBarrierExceptionTimeoutException异常,避免中断执行

使用Runnable

java
public class CyclicBarrierExample3 {
    private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
        log.info("callback is running");
    });

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        barrier.await();
        log.info("{} continue", threadNum);
    }
}

输出

java
17:03:22.695 [pool-1-thread-1] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 0 is ready
17:03:23.706 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 1 is ready
17:03:24.720 [pool-1-thread-3] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 2 is ready
17:03:25.733 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 3 is ready
17:03:26.748 [pool-1-thread-5] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 4 is ready
17:03:26.748 [pool-1-thread-5] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - callback is running
17:03:26.748 [pool-1-thread-5] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 4 continue
17:03:26.748 [pool-1-thread-1] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 0 continue
17:03:26.748 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 1 continue
17:03:26.748 [pool-1-thread-3] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 2 continue
17:03:26.748 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 3 continue
17:03:27.761 [pool-1-thread-6] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 5 is ready
17:03:28.777 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 6 is ready
17:03:29.778 [pool-1-thread-3] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 7 is ready
17:03:30.790 [pool-1-thread-5] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 8 is ready
17:03:31.792 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 9 is ready
17:03:31.792 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - callback is running
17:03:31.792 [pool-1-thread-2] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 9 continue
17:03:31.792 [pool-1-thread-6] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 5 continue
17:03:31.792 [pool-1-thread-4] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 6 continue
17:03:31.792 [pool-1-thread-3] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 7 continue
17:03:31.792 [pool-1-thread-5] INFO cn.diyai.mul_thread.cyclic_barrier.CyclicBarrierExample3 - 8 continue

可知,每ready完成都会执行callback

与CountDownLatch的区别

CyclicBarriar可以实现让一组线程等待某个状态后再全部同时执行。可以重复使用,但CountDownLatch不可以。

对于CountDownLatch来说,重点是“一个线程(多个线程)等待”,而其他的N个线程在完成“某件事情”之后,可以终止,也可以等待。

而对于CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。

  1. CountDownLatch是计数器,只能使用一次,而CyclicBarrier的计数器提供reset功能,可以多次使用。
  2. CountDownLatch是一个或者多个线程,等待其他多个线程完成某件事情之后才能执行。CyclicBarrier 是 多个线程互相等待,直到到达同一个同步点,再继续一起执行。对于CountDownLatch来说,重点是“一个线程(多个线程)等待”,而其他的N个线程在完成“某件事情”之后,可以终止,也可以等待。而对于CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。
  3. CountDownLatch是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而CyclicBarrier更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。

ReentrantLock

使用方法

java
public class ReentrantLockDemo {
    // 1
    private final Lock lock = new ReentrantLock();

    public void method1() {
        // 2
        lock.lock();
        try {
            doSomething();
        } finally {
            // 3
            lock.unlock();
        }
    }

    private void doSomething() {
        log.info("访问共享数据");
    }

    public static void main(String[] args) {
        ReentrantLockDemo demo = new ReentrantLockDemo();
        demo.method1();
    }
}

lock获取锁,unlock释放锁

与synchronized的区别

特性ReentrantLocksynchronized
类型类(java.util.concurrent.locks.ReentrantLock关键字
锁获取和释放必须显式调用 lock()unlock() 方法进行获取和释放锁隐式获取和释放锁,进入和退出 synchronized 块时自动获取和释放锁
中断支持支持中断,lockInterruptibly() 方法可以响应中断不支持中断,进入 synchronized 块时无法响应中断
公平性可以选择公平或非公平模式非公平锁
条件变量支持 Condition 条件变量,可以使用 newCondition() 方法创建不支持条件变量
性能竞争激烈时性能可能优于 synchronized竞争不激烈时性能可能优于 ReentrantLock,但在高并发场景下可能性能稍逊一筹
可重入性支持可重入,同一个线程可以重复获取锁支持可重入,同一个线程可以重复进入同一个 synchronized
适用范围适用于需要更灵活的锁控制、公平性选择、中断支持和条件变量的情况适用于简单的同步代码块

ReentrantReadWriteLock

代码示例

java
public class LockExample3 {

    private final Map<String, Data> map = new TreeMap<>();

    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    private final Lock readLock = lock.readLock();

    private final Lock writeLock = lock.writeLock();

    public Data get(String key) {
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public Set<String> getAllKeys() {
        readLock.lock();
        try {
            return map.keySet();
        } finally {
            readLock.unlock();
        }
    }

    public Data put(String key, Data value) {
        writeLock.lock();
        try {
            return map.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }

    class Data {

    }
}

测试用例

java

import java.util.Set;

import junit.framework.TestCase;
import org.junit.Before;
import org.junit.Test;
public class LockExample3Test extends TestCase {
    private LockExample3 lockExample3;

    @Before
    public void setUp() {
        lockExample3 = new LockExample3();
    }

    @Test
    public void testPutAndGet() {
        String key = "testKey";
        LockExample3.Data testData = lockExample3.new Data();

        assertNull(lockExample3.get(key));

        lockExample3.put(key, testData);
        assertEquals(testData, lockExample3.get(key));
    }


    @Test
    public void testGetAllKeys() {
        String key1 = "key1";
        String key2 = "key2";
        String key3 = "key3";
        LockExample3.Data data1 = lockExample3.new Data();
        LockExample3.Data data2 = lockExample3.new Data();
        LockExample3.Data data3 = lockExample3.new Data();

        lockExample3.put(key1, data1);
        lockExample3.put(key2, data2);
        lockExample3.put(key3, data3);

        Set<String> keys = lockExample3.getAllKeys();
        assertEquals(3, keys.size());
        assertEquals(true, keys.contains(key1));
        assertEquals(true, keys.contains(key2));
        assertEquals(true, keys.contains(key3));
    }
}

StampedLock

使用方法

java
public class LockExample7 implements Runnable{

    public static int count = 0;

    private final static StampedLock lock = new StampedLock();

    public static void main(String[] args) throws Exception {
        LockExample7 demo = new LockExample7();
        ConcurrentExecutor concurrentExecutor = new ConcurrentExecutor(5000,200,demo);
        concurrentExecutor.execute();
        log.info("count = {}",count);
    }
    private void add() {
        long stamp = lock.writeLock();
        try {
            count++;
        } finally {
            lock.unlock(stamp);
        }
    }

    @Override
    public void run() {
        add();
    }
}

Condition

演示了使用 ReentrantLockCondition 来实现线程间的等待和通知机制

java
public class LockExample6 {

    public static void main(String[] args) {
        ReentrantLock reentrantLock = new ReentrantLock();
        Condition condition = reentrantLock.newCondition();

        new Thread(() -> {
            try {
                reentrantLock.lock();
                log.info("{} wait signal",Thread.currentThread().getName()); // 1
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("{} get signal",Thread.currentThread().getName()); // 4
            reentrantLock.unlock();
        }).start();

        new Thread(() -> {
            reentrantLock.lock();
            log.info("{} get lock",Thread.currentThread().getName()); // 2
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            condition.signalAll();
            log.info("{} send signal! ",Thread.currentThread().getName()); // 3
            reentrantLock.unlock();
        }).start();
    }
}

输出

java
17:44:31.555 [Thread-0] INFO cn.diyai.mul_thread.lock.LockExample6 - Thread-0 wait signal
17:44:31.557 [Thread-1] INFO cn.diyai.mul_thread.lock.LockExample6 - Thread-1 get lock
17:44:34.563 [Thread-1] INFO cn.diyai.mul_thread.lock.LockExample6 - Thread-1 send signal!
17:44:34.563 [Thread-0] INFO cn.diyai.mul_thread.lock.LockExample6 - Thread-0 get signal

一个线程等待其他线程发送信号,而另一个线程发送信号通知等待中的线程

FutureTask

Future的用法

java
public class FutureExample {

    static class MyCallable implements Callable<String> { // 1

        @Override
        public String call() throws Exception {
            log.info("do something in callable");
            Thread.sleep(3000);
            return "Done";
        }
    }

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Future<String> future = executorService.submit(new MyCallable());
        log.info("do something in main");
        Thread.sleep(1000);
        String result = future.get();// 2
        log.info("result:{}", result);
        executorService.shutdown();
    }
}

输出

java
18:00:57.973 [main] INFO cn.diyai.mul_thread.aqs.future.FutureExample - do something in main
18:00:57.973 [pool-1-thread-1] INFO cn.diyai.mul_thread.aqs.future.FutureExample - do something in callable
18:01:00.975 [main] INFO cn.diyai.mul_thread.aqs.future.FutureExample - result:Done

FutureTask的用法

java
public class FutureTaskExample {

    public static void main(String[] args) throws Exception {
        FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                log.info("do something in callable");
                Thread.sleep(5000);
                return "Done";
            }
        });

        new Thread(futureTask).start();
        log.info("do something in main");
        Thread.sleep(1000);
        String result = futureTask.get();
        log.info("result:{}", result);
    }
}

输出

java
18:02:12.776 [main] INFO cn.diyai.mul_thread.aqs.future.FutureTaskExample - do something in main
18:02:12.776 [Thread-0] INFO cn.diyai.mul_thread.aqs.future.FutureTaskExample - do something in callable
18:02:15.788 [main] INFO cn.diyai.mul_thread.aqs.future.FutureTaskExample - result:Done

ForkJoin

基于工作窃取算法

java
public class ForkJoinTaskExample extends RecursiveTask<Integer> {

    public static final int threshold = 2;
    private int start;
    private int end;

    public ForkJoinTaskExample(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;

        //如果任务足够小就计算任务
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 如果任务大于阈值,就分裂成两个子任务计算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);

            // 执行子任务
            leftTask.fork();
            rightTask.fork();

            // 等待任务执行结束合并其结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            // 合并子任务
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkjoinPool = new ForkJoinPool();

        //生成一个计算任务,计算1+2+3+4...100
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);

        //执行一个任务
        Future<Integer> result = forkjoinPool.submit(task);

        try {
            log.info("result:{}", result.get());
        } catch (Exception e) {
            log.error("exception", e);
        }
    }
}

输出

java
18:05:07.554 [main] INFO cn.diyai.mul_thread.aqs.ForkJoinTaskExample - result:5050

BlockingQueue

常用方法

方法含义概述
boolean add(E e)将指定的元素插入此队列中(如果立即可行且不会违反容量限制),成功返回 true,否则抛出 IllegalStateException 异常。
boolean offer(E e)将指定的元素插入此队列中(如果立即可行且不会违反容量限制),成功返回 true,否则返回 false。
void put(E e)将指定的元素插入此队列中,如果队列已满,则阻塞等待直到有空间可用。
boolean offer(E e, long timeout, TimeUnit unit)将指定的元素插入此队列中(如果立即可行且不会违反容量限制),成功返回 true,否则阻塞等待指定时间,超时返回 false。
E remove()检索并删除此队列的头部,如果队列为空,则抛出 NoSuchElementException 异常。
E poll()检索并删除此队列的头部,如果队列为空,则返回 null。
E take()检索并删除此队列的头部,如果队列为空,则阻塞等待直到有元素可用。
E poll(long timeout, TimeUnit unit)检索并删除此队列的头部,如果队列为空,则阻塞等待指定时间,超时返回 null。
E element()检索但不删除此队列的头部,如果队列为空,则抛出 NoSuchElementException 异常。
E peek()检索但不删除此队列的头部,如果队列为空,则返回 null。

常见的阻塞队列

队列类型特点概述
ArrayBlockingQueue基于数组实现的有界阻塞队列,先进先出(FIFO),支持指定容量,插入元素时若队列已满则阻塞,删除元素时若队列为空则阻塞。
DelayQueue延迟队列,元素按照延迟时间的顺序进行消费,实现了 Delayed 接口的元素会按照其设定的延迟时间进行排序,支持延迟获取元素的操作。
LinkedBlockingQueue基于链表实现的可选有界或无界阻塞队列,先进先出(FIFO),支持指定容量,插入元素时若队列已满则阻塞,删除元素时若队列为空则阻塞。
PriorityBlockingQueue优先级阻塞队列,元素根据自然顺序或者自定义的比较器进行排序,插入元素时按照优先级进行排序,支持按优先级获取元素的操作,是无界队列。
SynchronousQueue同步队列,不存储元素,用于直接将生产者线程产生的元素传递给消费者线程,插入操作需要等待消费者取走元素,反之亦然,是一个没有存储元素的阻塞队列,每次插入/删除操作都必须有对应的消费者/生产者线程。