private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();// ... ...for (;;) {// ... ...// 通过 CAS 自旋,增加线程数 +1,增加成功跳出双层循环,继续往下执行if (compareAndIncrementWorkerCount(c))break retry;// ... ...}}// 到这儿,说明已经成功的将线程数 +1 了,但是真正的线程还没有被添加boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 添加线程,Worker 是继承了 AQS,实现了 Runnable 接口的包装类w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// ... ...// 添加新增的 Workerworkers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;// ... ...} finally {mainLock.unlock();}if (workerAdded) {// 启动 Workert.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
为了简明扼要,方法酌情进行了删减 。addWorker 方法主要是通过双重 for 循环进行线程数 +1,然后创建 Worker,并进行添加到 HashSet<Worker> workers 列表中,然后调用 t.start() 启动 Worker 。
那么接下来再一起看看 Worker 里面都做了啥?
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable {// ... ...final Thread thread;Runnable firstTask;/*** 通过指定的 firstTask 任务创建 Worker 对象*/Worker(Runnable firstTask) {setState(-1);this.firstTask = firstTask;// 通过当前 Worker 对象创建对应的线程对象 t,// 所以调用 t.start() 时最终会调用 Worker 的 run 方法this.thread = getThreadFactory().newThread(this);}public void run() {// run 方法最终会调用 ThreadPoolExecutor 的 runWorker 方法runWorker(this);}// ... ...}
通过 Worker 的构造函数能够了解到,会通过创建的 Worker 对象去构建线程对象,当线程对象启动时最终会调用 runWorker 方法 。
final void runWorker(Worker w) {Thread wt = Thread.currentThread();// 取出需要执行的任务Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 如果 task 不是 null 或者去 workQueue 队列中取到待执行的任务不为 nullwhile (task != null || (task = getTask()) != null) {// ... ...try {// 开始执行任务前的钩子方法beforeExecute(wt, task);Throwable thrown = null;try {task.run();// ... ...} finally {// 任务执行后的钩子方法afterExecute(task, thrown);}} finally {// ... ...}}completedAbruptly = false;} finally {// Worker 退出processWorkerExit(w, completedAbruptly);}}
runWorker 方法,首先会取出要执行的任务 task,如果为空则会调用 getTask 方法从任务队列中获取,然后调用任务对应的 run 方法进行执行,另外预置了 beforeExecute、afterExecute 两个钩子函数,让研发人员监控线程执行成为可能 。
另外,线程池中的线程如何从队列中获取待执行的任务的呢?走进 getTask 方法看一看 。
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?// 这块体现了:线程池的线程是复用的,通过循环去获取队列中的任务去执行 。for (;;) {int c = ctl.get();// ... ...int wc = workerCountOf(c);// allowCoreThreadTimeOut: 是否允许核心线程超时.// 如果设置为 false,那么线程池在达到 corePoolSize 个工作线程之前,不会让闲置的工作线程退出 。boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// ... ...try {// 从 workQueue 队列中取待执行的任务,根据 timed 来选择等待时间Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
为了便于理解,源码做了部分删减 。重点关注从任务队列中获取待执行任务的对象的方法调用:workQueue.poll()、workQueue.take() ,前者是移除并返回队列中的头部元素,如果队列为空,则返回 null,而后者是移除并返回队列中的头部元素,如果队列为空,则阻塞 。
烟未灭,酒过半 ... ... 源码探讨就谈到这儿... ...
源码揭秘之后的反思(一)钩子函数的使用场景
场景一:
![Java线程池深度揭秘](http://img.jiangsulong.com/220420/2133256216-5.jpg)
文章插图
如上面自定义的 MyThreadPoolExecutor,可以让日志打印线程及线程数等等信息 。意味着研发人员可以扩展 ThreadPoolExecutor,对钩子函数 beforeExecute、afterExecute 进行实现,进而可以知晓线程池内部的调度细节,可以有效进行监控,针对故障排查应该很有帮助 。
场景二:
AbstractExecutorService 并没有实现 execute 方法,而是为子类 ThreadPoolExecutor 留了个口子,让子类去灵活扩展(钩子函数) 。
推荐阅读
- AI人工智能:JAVA教你拍照识别文字 并语音播报
- JAVA程序员常用的几个工具类
- JAVA并发-AtomicInteger
- Java正则表达式详解
- 美团对 Java 新一代垃圾回收器 ZGC 的探索与实践
- java中的装箱和拆箱
- 2019年的6个JavaScript用户认证库
- 又一个小而美的Java Web框架:Solon
- 万字详文:Java内存泄漏、性能优化、宕机死锁的N种姿势
- 笔记本电池保养方法