这周看完线程池,然后并发相关的学习就先告一段落。
本周最后要学习的类是 ThreadPoolExecutor,它从 Executor 接口一层层继承下来,因此总共要学习四个类。主要的学习来源是《深度解读 java 线程池设计思想及源码实现》 ,这篇文章写得很好,但是需要结合源码慢慢阅读。
要学习的四个线程池类,前三个类的方法截图如下,结构还是比较清晰的:
下面分别记述吧。
Executor Executor 是一个接口,只有一个方法,就是执行一个 Runnable。
1 2 3 public interface Executor { void execute (Runnable command) ; }
这是线程池最核心的方法,我们平常使用线程池,在拿到线程池对象之后,也只需要使用这个方法了:
1 2 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10 ); executor.execute(() -> System.out.println("runnable正在执行" ));
这个方法要看到最后才能看到实现,具体的实现还是很复杂的,也很精妙。
ExecutorService ExecutorService 也是一个接口,继承自 Executor 接口,新定义了许多方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public interface ExecutorService extends Executor { void shutdown () ; List<Runnable> shutdownNow () ; boolean isShutdown () ; boolean isTerminated () ; boolean awaitTermination (long timeout, TimeUnit unit) throws InterruptedException ; <T> Future<T> submit (Callable<T> task) ; <T> Future<T> submit (Runnable task, T result) ; Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny (Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException ; <T> T invokeAny (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException ; }
通常来讲,我们使用线程池时,拿到的对象都是这个接口的实现,因为这个接口中的大部分方法已经足够我们使用了。就比如最常见的 Executors,使用 Executors 工厂方法获取线程池,拿到的对象很多都是 ExecutorService 接口实现。
1 2 3 ExecutorService executorService1 = Executors.newFixedThreadPool(10 ); ExecutorService executorService2 = Executors.newSingleThreadExecutor(); ExecutorService executorService3 = Executors.newCachedThreadPool();
简单回顾一下:
Executor 类只有一个方法,execute()
,执行任务,没有返回。
ExecutorService 类新定义了很多方法,其中有 submit()
,执行任务,有返回(通过 Future 类获取返回值)。
AbstractExecutorService AbstractExecutorService 是一个抽象类,继承自 ExecutorService 接口,实现了父类的 submit()
、invokeAny()
、invokeAll()
方法。
实现的方法中,其中比较重要的是 submit()
方法,我们刚刚提过,这个方法执行任务,有返回值。意思是说,在执行 submit()
方法方法之后,可以获得一个 Future 对象,这就是线程执行任务的返回值,可以通过 Future 的 get()
方法一直等到任务结束拿回任务结果。Future 的具体使用不在这里详细叙述,在此只关注实现。
AbstractExecutorService 的 submit()
方法有三种,主要的思想都是将【任务】和【返回值】封装成一个 Future 对象,然后返回这个 Future 对象。这里引入了 RunnableFuture 接口,这是一个同时继承 Runnable 和 Future 的接口,即让 Runnable 有了返回值。FutureTask 类实现了这个接口,submit()
方法也主要是通过这个类实现提交任务、返回值的。
下面是 RunnableFuture 接口和它的实现类 FutureTask 的源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public interface RunnableFuture <V > extends Runnable , Future <V > { void run () ; } public class FutureTask <V > implements RunnableFuture <V > { private volatile int state; private static final int NEW = 0 ; private static final int COMPLETING = 1 ; private static final int NORMAL = 2 ; private static final int EXCEPTIONAL = 3 ; private static final int CANCELLED = 4 ; private static final int INTERRUPTING = 5 ; private static final int INTERRUPTED = 6 ; private Callable<V> callable; public FutureTask (Runnable runnable, V result) { this .callable = Executors.callable(runnable, result); this .state = NEW; } public FutureTask (Callable<V> callable) { if (callable == null ) throw new NullPointerException(); this .callable = callable; this .state = NEW; } ... }
下面直接附上 AbstractExecutorService 的源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor (Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor (Callable<T> callable) { return new FutureTask<T>(callable); } public Future<?> submit(Runnable task) { if (task == null ) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null ); execute(ftask); return ftask; } public <T> Future<T> submit (Runnable task, T result) { if (task == null ) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit (Callable<T> task) { if (task == null ) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } private <T> T doInvokeAny (Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { if (tasks == null ) throw new NullPointerException(); int ntasks = tasks.size(); if (ntasks == 0 ) throw new IllegalArgumentException(); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks); ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this ); try { ExecutionException ee = null ; final long deadline = timed ? System.nanoTime() + nanos : 0L ; Iterator<? extends Callable<T>> it = tasks.iterator(); futures.add(ecs.submit(it.next())); --ntasks; int active = 1 ; for (;;) { Future<T> f = ecs.poll(); if (f == null ) { if (ntasks > 0 ) { --ntasks; futures.add(ecs.submit(it.next())); ++active; } else if (active == 0 ) break ; else if (timed) { f = ecs.poll(nanos, TimeUnit.NANOSECONDS); if (f == null ) throw new TimeoutException(); nanos = deadline - System.nanoTime(); } else f = ecs.take(); } if (f != null ) { --active; try { return f.get(); } catch (ExecutionException eex) { ee = eex; } catch (RuntimeException rex) { ee = new ExecutionException(rex); } } } if (ee == null ) ee = new ExecutionException(); throw ee; } finally { for (int i = 0 , size = futures.size(); i < size; i++) futures.get(i).cancel(true ); } } public <T> T invokeAny (Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { try { return doInvokeAny(tasks, false , 0 ); } catch (TimeoutException cannotHappen) { assert false ; return null ; } } public <T> T invokeAny (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return doInvokeAny(tasks, true , unit.toNanos(timeout)); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null ) throw new NullPointerException(); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false ; try { for (Callable<T> t : tasks) { RunnableFuture<T> f = newTaskFor(t); futures.add(f); execute(f); } for (int i = 0 , size = futures.size(); i < size; i++) { Future<T> f = futures.get(i); if (!f.isDone()) { try { f.get(); } catch (CancellationException | ExecutionException ignore) {} } } done = true ; return futures; } finally { if (!done) for (int i = 0 , size = futures.size(); i < size; i++) futures.get(i).cancel(true ); } } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { if (tasks == null ) throw new NullPointerException(); long nanos = unit.toNanos(timeout); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false ; try { for (Callable<T> t : tasks) futures.add(newTaskFor(t)); final long deadline = System.nanoTime() + nanos; final int size = futures.size(); for (int i = 0 ; i < size; i++) { execute((Runnable)futures.get(i)); nanos = deadline - System.nanoTime(); if (nanos <= 0L ) return futures; } for (int i = 0 ; i < size; i++) { Future<T> f = futures.get(i); if (!f.isDone()) { if (nanos <= 0L ) return futures; try { f.get(nanos, TimeUnit.NANOSECONDS); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } catch (TimeoutException toe) { return futures; } nanos = deadline - System.nanoTime(); } } done = true ; return futures; } finally { if (!done) for (int i = 0 , size = futures.size(); i < size; i++) futures.get(i).cancel(true ); } } }
ThreadPoolExecutor ThreadPoolExecutor 就是我们今天要学习的线程池,它有很多代码,我们挑着学。
从整体上讲,ThreadPoolExecutor 有两个最重要的设计,一是设定了一个线程池的内部状态数,二是通过内部类 Worker 来执行池中的任务。
内部状态数 ThreadPoolExecutor 内部维护了一个 AtomicInteger(并发安全的 Integer),这个数字是线程池的内部状态数,它是一个 int 数字,长 32 位,其中高 3 位是状态位,低 29 位是线程的数量,具体可以看这部分代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class ThreadPoolExecutor extends AbstractExecutorService { private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0 )); private static final int COUNT_BITS = Integer.SIZE - 3 ; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; private static final int CAPACITY = (1 << COUNT_BITS) - 1 ; private static int runStateOf (int c) { return c & ~CAPACITY; } private static int workerCountOf (int c) { return c & CAPACITY; } private static int ctlOf (int rs, int wc) { return rs | wc; } ... }
内部类Worker ThreadPoolExecutor 有一个内部类 Worker,这个类继承自 AQS,实现了 Runnable 接口,这一继承一实现特别巧妙,这个后面再说。
Worker 类实际上就是一个封装的线程,线程池按顺序执行批量任务,实际上就是内部有一些 Worker 对象,每个 Worker 对象就是一个线程,这些 worker 各自执行任务,执行完一个去任务队列里再拿一个。从这个意义上去理解,线程池其实就是 Worker 池,池中的所有 Worker 按顺序执行丢进池子里的任务。
想要更深地理解 Worker,就要去读 ThreadPoolExecutor 的其他源码了,尤其是构造方法和 execute()
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } public void run () { runWorker(this ); } }
我先把剩下的所有代码扔上来吧。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException(); this .acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
下面是终于提到的 execute()
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 public void execute (Runnable command) { if (command == null ) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) { reject(command); } } private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (!workerStarted) addWorkerFailed(w); } return workerStarted; } final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
懒得写了,列一些点吧:
线程池的构造方法中,会指定很多参数,corePoolSize 指的是核心线程数,实际上也就是核心 Worker 数。当线程池执行 execute(runnable)
方法时,扔进来一个 runnable 任务,会首先判断 Worker 的数量是否达到了核心线程数,如果不到就去创建一个 Worker 对象。核心线程数内的所有 Worker 对象都不会被销毁,执行完任务就去任务队列里面拿,任务队列空了就等。
当核心线程数满了,即有 corePoolSize 个 Worker 对象了,那么加入任务队列 workQueue 中,这是一个 AQS 阻塞队列(线程安全),当有 Worker 执行完任务后,会自己来 workQueue 里拿一个任务执行。
如果核心线程数满了,任务队列也满了,再加进来的任务,会按照最大线程数 maximumPoolSize 的大小继续创建 Worker,比如核心线程数是 10,最大线程数是 20,那么在已经有了 10 个 Worker 对象、并且任务队列也满了的情况下,可以继续创建 Worker 对象(每新加一个任务就新创建一个,直到再新创建 10 个达到最大的 20 个)。核心线程数之外的 Worker 对象,将在没有任务的可执行的情况下被销毁掉。
execute(runnable)
方法的主要作用,就是创建 Worker,启动 Worker 中的线程,这个线程在启动之后(调用 start()
方法),将由 CPU 去调度,如果分到时间片,那么将调用起线程的 run()
方法,去执行 runnable 任务。而我们上面提过了,Worker 对象实现了 Runnable 接口,线程执行的那个 Runnable 对象,实际上就是 Worker 对象(这个设计太绝了!),由此实现了让 Worker 执行任务的目的。
就写到这里吧。