Skip to content

并发模拟代码

并发工具类

java
package cn.diyai.mul_thread;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

@Slf4j
public class ConcurrentExecutor<T> {
    // 线程池
    private ExecutorService executorService;
    // 请求处理器(业务逻辑)
    private Runnable task;
    // 请求总数
    private long requestTotal;
    // 并发线程数
    private int threadTotal;
    // 控制并发的信号量
    private Semaphore semaphore;
    // 计数器,用于等待所有任务完成
    private CountDownLatch countDownLatch;

    public ConcurrentExecutor(int requestTotal, int threadTotal, Runnable task) {
        this.requestTotal = requestTotal;
        this.threadTotal = threadTotal;
        this.task = task;
        this.semaphore = new Semaphore(threadTotal);
        this.countDownLatch = new CountDownLatch(requestTotal);
    }

    public void execute() throws InterruptedException {
        executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < requestTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    task.run();
                    semaphore.release();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    log.error("Interrupted while acquiring semaphore", e);
                } catch (Exception e) {
                    log.error("Exception in task execution", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
    }

    /**
     * 提供给外部自定义业务逻辑的方法
     * @param operation 一个Runnable对象,表示待并发执行的操作
     */
    public void setTask(Runnable operation) {
        this.task = operation;
    }

    /**
     * 可选方法,获取当前已完成的任务数
     * @return 已完成的任务数
     */
    public long getCompletedTasks() {
        return requestTotal - countDownLatch.getCount();
    }
}

使用方法

java

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ConcurrentExecutorDemo implements Runnable {
    private static int count = 0;

    @Override
    public void run() {
        add();
    }

    private synchronized void add() {
        count++;
        // 这里可以添加具体的业务操作
    }

    public static void main(String[] args) throws InterruptedException {
        ConcurrentExecutorDemo counterOp = new ConcurrentExecutorDemo();
        ConcurrentExecutor<Void> executor = new ConcurrentExecutor<>(5000, 200, counterOp);
        executor.execute();
        log.info("Finished. Total count: {}", ConcurrentExecutorDemo.count);
    }
}

多次执行,执行结果都是5000

19:37:01.687 [main] INFO cn.diyai.mul_thread.ConcurrentExecutorDemo - Finished. Total count: 5000