https://blog.csdn.net/qq271859852/article/details/104860255
方案一
由于线程池的行为是定义在JDK相关代码中,我们想改变其默认行为,很自然的一种想法便是:继承自JDK的线程池类java.util.concurrent.ThreadPoolExecutor,然后改写其execute方法,将判断队列与maximumPoolSize的逻辑顺序调整一下,以达到目的
原来的逻辑如下:
public void execute(Runnable command) { if (command == null) throw new NullPointerException();
int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // 创建新线程 if (addWorker(command, true)) return; c = ctl.get(); } // 代码运行到此处,说明线程池数量达到了corePoolSize if (isRunning(c) && workQueue.offer(command)) { // 将任务成功入队 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 代码运行到此处,说明入队失败 else if (!addWorker(command, false)) // 创建新线程失败则执行拒绝策略 reject(command);
}
但是仔细阅读代码会发现,execute中涉及到的一些关键方法如workerCountOf、addWorker等是私有的,关键变量如ctl、corePoolSize也是私有的,即无法通过简单继承ThreadPoolExecutor改写其execute方法的核心逻辑达到目的。
那考虑的一个变种是,定义一个MyThreadPoolExecutor,把ThreadPoolExecutor的代码照搬过来,只改写其中execute方法,改写后的逻辑如下:
public void execute(Runnable command) { if (command == null)
int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 先判断maximumPoolSize if (workerCountOf(c) < maximumPoolSize) { if (addWorker(command, false)) return; c = ctl.get(); } // 再判断队列 else if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (isRunning(c) && !workQueue.offer(command)) reject(command);
}
改写之后,发现reject方法也得重写,原因是RejectedExecutionHandler#rejectedExecution第二个入参是ThreadPoolExecutor,不能传this
这样,连RejectedExecutionHandler也要改写一下
由于RejectedExecutionHandler的改造并非面试题核心逻辑,所以此处省略,明白要表达的意思即可
但这样做之后,与三方框架的兼容就很难了—>有不少三方框架入参是需要ThreadPoolExecutor,而不是自定义的MyThreadPoolExecutor,后续的使用会是个问题
自定义MyThreadPoolExecutor需要代码大篇幅的拷贝,麻烦不说,兼容性还是个问题,从实战出发考虑,可行性很低
方案二
那有没有什么方案能够既省事,又能兼顾兼容性?
两步走:
- 自定义Queue,改写offer逻辑
- 自定义线程池类,继承自ThreadPoolExecutor,改写核心逻辑
自定义Queue
public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
private static final long serialVersionUID = -2635853580887179627L; // 自定义的线程池类,继承自ThreadPoolExecutor private EagerThreadPoolExecutor executor; public TaskQueue(int capacity) { super(capacity); } public void setExecutor(EagerThreadPoolExecutor exec) { executor = exec; } // offer方法的含义是:将任务提交到队列中,返回值为true/false,分别代表提交成功/提交失败 @Override public boolean offer(Runnable runnable) { if (executor == null) { throw new RejectedExecutionException("The task queue does not have executor!"); } // 线程池的当前线程数 int currentPoolThreadSize = executor.getPoolSize(); if (executor.getSubmittedTaskCount() < currentPoolThreadSize) { // 已提交的任务数量小于当前线程数,意味着线程池中有空闲线程,直接扔进队列里,让线程去处理 return super.offer(runnable); } // return false to let executor create new worker. if (currentPoolThreadSize < executor.getMaximumPoolSize()) { // 重点: 当前线程数小于 最大线程数 ,返回false,暗含入队失败,让线程池去创建新的线程 return false; } // 重点: 代码运行到此处,说明当前线程数 >= 最大线程数,需要真正的提交到队列中 return super.offer(runnable); } public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { if (executor.isShutdown()) { throw new RejectedExecutionException("Executor is shutdown!"); } return super.offer(o, timeout, unit); }
}
自定义线程池类
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
/** * 定义一个成员变量,用于记录当前线程池中已提交的任务数量 */ private final AtomicInteger submittedTaskCount = new AtomicInteger(0); public EagerThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, TaskQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } public int getSubmittedTaskCount() { return submittedTaskCount.get(); } @Override protected void afterExecute(Runnable r, Throwable t) { // ThreadPoolExecutor的勾子方法,在task执行完后需要将池中已提交的任务数 - 1 submittedTaskCount.decrementAndGet(); } @Override public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } // do not increment in method beforeExecute! // 将池中已提交的任务数 + 1 submittedTaskCount.incrementAndGet(); try { super.execute(command); } catch (RejectedExecutionException rx) { // retry to offer the task into queue. final TaskQueue queue = (TaskQueue) super.getQueue(); try { if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException("Queue capacity is full.", rx); } } catch (InterruptedException x) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException(x); } } catch (Throwable t) { // decrease any way submittedTaskCount.decrementAndGet(); throw t; } }
}
核心逻辑:
- 当提交任务给EagerThreadPoolExecutor,执行
submittedTaskCount.incrementAndGet();
将池中已提交的任务数 + 1,然后就调用父类的execute方法 - 当执行
workQueue.offer(command)
,走到自定义的TaskQueue#offer逻辑,而offer方法的返回值 决定着是否创建更多的线程 :返回true,代表入队成功,不创建线程;返回false,代表入队失败,需要创建线程 - 当前线程数小于最大线程数就返回false,代表入队失败,需要创建线程
可以从至少两个开源框架上找到答案
Dubbo 2.6.2及以上
其实上边的方案二,代码来自于Dubbo源码,
相关git issue在此: Extension: Eager Thread Pool
Tomcat
Tomcat自定义的线程池类名与JDK的相同,都叫ThreadPoolExecutor,只是包不同,且Tomcat的ThreadPoolExecutor继承自JDK的ThreadPoolExecutor
Tomcat自定义的队列也叫TaskQueue
Tomcat的ThreadPoolExecutor与TaskQueue核心逻辑、思想与方案二贴的代码几乎一致。实际上,是carryxyh(Dubbo EagerThreadPoolExecutor作者)借鉴的Tomcat设计,关于这一点Dubbo github issue上作者本人也有提及
JDK线程池与Tomcat线程池方案谁最好?
笔者认为,没有哪种方案最好,技术没有银弹,只是在不同视角进行的trade off,在某种场景下最好的方案在另一个场景中可能却导致糟糕的后果。可以从另一个角度考虑:如果有一种放之四海皆准,从各个角度考虑都优于其他技术的存在,那么它的出现必将完全取代它的竞品。而从现实看,显然, JDK线程线与Tomcat线程池都各有场景与发展,并没有出现一方取代另一方的情况,因此不存在哪种方案最好的说法
如果线上环境要使用线程池,哪一种更合适?
线程数与CPU核数、任务类型的关系就不细说了。简单而言,如果不能忍受延迟,期望应用能尽快地为用户提供服务,那么Tomcat线程池可能更适合你;相反,如果你能容忍一些延迟来换取性能上的提升,那么JDK线程池可能会更合适一些
也可以直接包装tomcat线程池,并直接使用tomcat的TaskQueue:
包装线程池:
import com.google.common.base.Stopwatch; import io.micrometer.core.instrument.util.NamedThreadFactory; import lombok.extern.slf4j.Slf4j; import org.apache.tomcat.util.threads.TaskQueue;
import java.util.concurrent.*;
/**
自定义线程池
@author zecong.nian
@since 2024/8/5 */ @Slf4j public class MyThreadPoolExecutor extends org.apache.tomcat.util.threads.ThreadPoolExecutor {
/**
- 定义一个成员变量,用于记录当前线程池中已提交的任务数量 */ private String threadPoolName;
/**
- 自定义线程池
- @param corePoolSize 核心线程池数量
- @param maximumPoolSize 最大线程池数量
- @param keepAliveTime 超过核心线程池数量存活时间
- @param unit 超过核心线程池数量存活时间单位
- @param workQueue 存放任务的队列
- @param threadPoolName 可以定义线程池名称
- @param handler 当队列满时执行拒绝策略 */ public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, LinkedBlockingQueue<Runnable> workQueue, String threadPoolName, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new NamedThreadFactory(threadPoolName), handler); this.threadPoolName = threadPoolName; if (workQueue instanceof TaskQueue) { ((TaskQueue)workQueue).setParent(this); } }
/**
- 方法执行完毕之后执行
- @param r
- @param t */ @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r,t); }
@Override public void execute(Runnable command) { super.execute(() -> { Stopwatch timer = Stopwatch.createStarted(); try { command.run(); } finally { // 监控 } }); }
}
调用:
public static void main(String[] args) {
MyThreadPoolExecutor myExecutor = new MyThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new TaskQueue(queueCapacity), "customThreadPool", new ThreadPoolExecutor.AbortPolicy()); myExecutor.submit(() -> { System.out.println(Thread.currentThread().getName() + ": " + counter.incrementAndGet()); try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } }); }
全部评论