线程池源码解读的示例分析
发表于:2025-02-10 作者:千家信息网编辑
千家信息网最后更新 2025年02月10日,线程池源码解读的示例分析,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。一、executectl作为AtomicInteger类存放了
千家信息网最后更新 2025年02月10日线程池源码解读的示例分析
线程池源码解读的示例分析,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
一、execute
ctl作为AtomicInteger类存放了类中的两种信息,在其中由高3位来保存线程池的状态,后29位来保存此时线程池中的Woker类线程数量(由此可知,线程池中的线程数量最高可以接受大约在五亿左右)。由此可见给出的runStateOf()和workerCountOf()方法分别给出了查看线程状态和线程数量的方法。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //如果运行的线程数小于corePoolSize,尝试创建一个新线程(Worker),并执行它的第一个command if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //线程数大于corePoolSize,将线程放入任务队列 //第一次校验线程池在运行状态 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //第二次校验,防止在第一次校验通过后线程池关闭。如果线程池关闭,在队列中删除task并拒绝task if (! isRunning(recheck) && remove(command)) reject(command); //如果线程数=0(线程都死掉了,比如:corePoolSize=0),新建线程且未指定firstTask,仅轮询任务队列 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //任务队列已满,尝试创建新线程执行task,创建失败后拒绝task //创建失败原因:1.线程池关闭;2.线程数已经达到maxPoolSize else if (!addWorker(command, false)) reject(command);}
private boolean addWorker(Runnable firstTask, boolean core) { retry: //外层循环判断线程池的状态 for (;;) { int c = ctl.get(); //线程池状态 int rs = runStateOf(c); //线程池状态:RUNNING = -1、SHUTDOWN = 0、STOP = 1、TIDYING = 2、TERMINATED if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; //用CAS的方式对线程数量进行+1操作 for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //worker实现了Runable接口 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //workers是一个worker数组 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //启动线程,执行的就是worker中的run() t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
Worker类:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable//Worker是一个线程 { private static final long serialVersionUID = 6138294804551838833L; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; //把当前Worker包装成一个thread this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); }}
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //如果这个worker还没有执行过在构造方法就传入的任务,那么在这个方法中,会直接执行这一任务,如果没有,则会 //尝试去从任务队列当中去取的新的任务。 //在执行完毕后,工作线程的使命并没有真正宣告段落。在while部分worker仍旧会通过getTask()方法试图取得新 //的任务。下面是getTask()的实现。 while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //从工作队列中取出线程 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
二、submit
public Future> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFutureftask = newTaskFor(task, null); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, result); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future submit(Callable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task); execute(ftask); return ftask; }
protectedRunnableFuture newTaskFor(Runnable runnable, T value) { return new FutureTask (runnable, value); } protected RunnableFuture newTaskFor(Callable callable) { return new FutureTask (callable); }
可见,submit将普通的runnable包装成FutureTask并返回,再调用execute去执行。
关于线程池源码解读的示例分析问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注行业资讯频道了解更多相关知识。
线程
任务
方法
状态
队列
数量
分析
问题
尝试
源码
示例
更多
由此
第一次
包装
工作
帮助
解答
运行
易行
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
饥荒服务器加载时间长
未来之役服务器英文名字看不懂
迪庆互联网科技选哪家
软件开发和服务器开发的关系
河南软件开发大学排名
生活中的通信网络技术
计算机网络技术所要考的证
数据库单元2总结
一个服务器主机如何创建多个站点
材质数据库下载
软件开发需求任务书
南京东秦网络技术服务有限公司
软件开发方法与技术
来了网络连接服务器
跟踪调查数据库
sql语句能分离数据库吗
信息网络安全体系结构
增强网络安全意识主题班会记录
奇酷互联网络科技
互联网电话服务器
网络安全saas公司
创建db2管理服务器报错
阿里数据库发展历史
意识形态和网络安全风险排查报告
反恐精英服务器维护中
开发保险软件开发
海康远程服务器重开资源管理器
广州市中拓网络技术有限公司
空数据库如何加载数据
服务器销毁