主题
并发模拟代码
并发工具类
java
package cn.diyai.mul_thread;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
public class ConcurrentExecutor<T> {
// 线程池
private ExecutorService executorService;
// 请求处理器(业务逻辑)
private Runnable task;
// 请求总数
private long requestTotal;
// 并发线程数
private int threadTotal;
// 控制并发的信号量
private Semaphore semaphore;
// 计数器,用于等待所有任务完成
private CountDownLatch countDownLatch;
public ConcurrentExecutor(int requestTotal, int threadTotal, Runnable task) {
this.requestTotal = requestTotal;
this.threadTotal = threadTotal;
this.task = task;
this.semaphore = new Semaphore(threadTotal);
this.countDownLatch = new CountDownLatch(requestTotal);
}
public void execute() throws InterruptedException {
executorService = Executors.newCachedThreadPool();
for (int i = 0; i < requestTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
task.run();
semaphore.release();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Interrupted while acquiring semaphore", e);
} catch (Exception e) {
log.error("Exception in task execution", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
}
/**
* 提供给外部自定义业务逻辑的方法
* @param operation 一个Runnable对象,表示待并发执行的操作
*/
public void setTask(Runnable operation) {
this.task = operation;
}
/**
* 可选方法,获取当前已完成的任务数
* @return 已完成的任务数
*/
public long getCompletedTasks() {
return requestTotal - countDownLatch.getCount();
}
}
使用方法
java
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ConcurrentExecutorDemo implements Runnable {
private static int count = 0;
@Override
public void run() {
add();
}
private synchronized void add() {
count++;
// 这里可以添加具体的业务操作
}
public static void main(String[] args) throws InterruptedException {
ConcurrentExecutorDemo counterOp = new ConcurrentExecutorDemo();
ConcurrentExecutor<Void> executor = new ConcurrentExecutor<>(5000, 200, counterOp);
executor.execute();
log.info("Finished. Total count: {}", ConcurrentExecutorDemo.count);
}
}
多次执行,执行结果都是5000
19:37:01.687 [main] INFO cn.diyai.mul_thread.ConcurrentExecutorDemo - Finished. Total count: 5000