我们可以通过向ExecutorService
提交Callable
或FutureTask
来异步获取线程执行结果,但这种方式的缺陷在于Future.get()
会阻塞主线程,导致即使和它同时进行的其它线程已经执行完毕,也要等待这个耗时线程执行完才能获取结果,大大影响运行效率。
ExecutorCompletionService
聚合了Executor
和BlockingQueue
,利用它我们每次都能从阻塞队列中获取到最近执行完毕的futureTask
,而避免等待某个耗时较长的任务执行。
使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
|
@Test public void enquiry() { ExecutorService executor = Executors.newFixedThreadPool(3); CompletionService<Integer> service = new ExecutorCompletionService<>(executor);
service.submit(this::getPriceByS1); service.submit(this::getPriceByS2); service.submit(this::getPriceByS3);
IntStream.range(0, 3).forEach(index -> { try { Future<Integer> future = service.take(); Integer value = future.get(); System.out.println("value: " + value);
executor.execute(() -> save(value)); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); }
private Integer getPriceByS1() { return 100; }
private Integer getPriceByS2() { return 200; }
private Integer getPriceByS3() { return 300; }
private void save(Integer value) { System.out.println("save value: " + value); }
@Test public void multiPath() { ExecutorService executor = Executors.newFixedThreadPool(3); CompletionService<Integer> service = new ExecutorCompletionService<>(executor);
List<Future<Integer>> futures = new ArrayList<>(3);
futures.add(service.submit(this::getPriceByS1)); futures.add(service.submit(this::getPriceByS2)); futures.add(service.submit(this::getPriceByS3));
Integer value = 0; try { for (int i = 0; i < 3; ++i) { try { value = service.take().get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
if (value != null) { break; } } } finally { System.out.println("value: " + value);
futures.forEach(future -> future.cancel(true)); } }
|
如上,我们通过调用submit(Callable<V> task)
方法向CompletionService
提交任务,获取任务结果则是先调用take()
获取到futureTask
,再调用futureTask.get()
获取任务执行结果,如果take()
没获取到futureTask
,主线程就会一直阻塞。
UML
实现原理
CompletionService
内部聚合了Executor
和BlockingQueue
,这样向其提交的任务都会交由Executor
执行,任务结果则存放在BlockingQueue
中,于是就能利用BlockingQueue
的特性,使得在获取任务结果时,如果还没有任务完成,就可以选择阻塞或返回null。
至于任务结果是如何存放到BlockingQueue
中的,则是通过将任务包装成QueueingFuture
,QueueingFuture
继承自FutureTask
并覆盖了done()
方法:task自行完毕后将结果保存到BlockingQueue
中。
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
| public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; }
protected void done() { completionQueue.add(task); } private final Future<V> task; }
private RunnableFuture<V> newTaskFor(Callable<V> task) { if (aes == null) return new FutureTask<V>(task); else return aes.newTaskFor(task); }
private RunnableFuture<V> newTaskFor(Runnable task, V result) { if (aes == null) return new FutureTask<V>(task, result); else return aes.newTaskFor(task, result); }
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); }
public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; }
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; }
public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f; }
public Future<V> take() throws InterruptedException { return completionQueue.take(); }
public Future<V> poll() { return completionQueue.poll(); }
public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); }
}
|