日常开发中,可能会遇到某个业务方法中需要调用多个 rpc 接口,这时就可以考虑利用多线程技术提高执行效率。
Thread.join
其作用是调用线程等待子线程完成后,才能继续用下运行。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| @Test public void joinTest() throws InterruptedException { Map<String, String> taskResults = new HashMap<>();
Runnable task1 = () -> { try { System.out.println("任务1执行中"); Thread.sleep(200); taskResults.put("task1", "result1"); } catch (InterruptedException e) { e.printStackTrace(); } };
Runnable task2 = () -> { try { System.out.println("任务2执行中"); Thread.sleep(200); taskResults.put("task2", "result2"); } catch (InterruptedException e) { e.printStackTrace(); } };
Runnable task3 = () -> { try { System.out.println("任务3执行中"); Thread.sleep(200); taskResults.put("task3", "result3"); } catch (InterruptedException e) { e.printStackTrace(); } };
Thread thread1 = new Thread(task1); Thread thread2 = new Thread(task2); Thread thread3 = new Thread(task3);
thread1.start(); thread2.start(); thread3.start();
thread1.join(); thread2.join(); thread3.join();
System.out.println("所有任务已执行结束,任务结果:" + taskResults.toString()); }
|
输出:
1 2 3 4
| 任务2执行中 任务1执行中 任务3执行中 所有任务已执行结束,任务结果:{task1=result1, task2=result2, task3=result3}
|
缺点:
要调用 Thread.join() 方法,就得显示创建线程,从而不能利用线程池以减少资源的消耗。
Thread.invokeAll
其作用是触发执行任务列表,返回的结果顺序与任务在任务列表中的顺序一致。所有线程执行完任务后才返回结果。如果设置了超时时间,未超时完成则正常返回结果,如果超时未完成则报异常。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| @Test public void invokeAllTest() throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newFixedThreadPool(5);
Callable<String> task1 = () -> { System.out.println("任务1执行中"); Thread.sleep(200); return "result1"; };
Callable<String> task2 = () -> { System.out.println("任务2执行中"); Thread.sleep(200); return "result2"; };
Callable<String> task3 = () -> { System.out.println("任务3执行中"); Thread.sleep(200); return "result3"; };
List<Future<String>> futures = executor.invokeAll(Arrays.asList(task1, task2, task3));
for (Future future : futures) { System.out.println(future.get()); } }
|
输出:
1 2 3 4 5 6
| 任务2执行中 任务1执行中 任务3执行中 result1 result2 result3
|
观察”do work”输出可知线程是并行执行的,观察”result”输出可知future结果是顺序序输出的。虽然输出与前面的FutureTask方式一样,但效率它会高些,因为FutureTask
的get
方法会阻塞后面的操作。
FutureTask
FutureTask
适用于异步获取执行结果或取消执行任务的场景。
由于它同时实现了Runnable
和Future
接口,因此它既可以作为Runnable
被线程执行,又可以作为Future
得到Callable
的返回值。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| @Test public void test1() throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(10);
FutureTask<String> task1 = new FutureTask<>(() -> { System.out.println(Thread.currentThread().getName() + ": task1"); Thread.sleep(200); return "result1"; }); FutureTask<String> task2 = new FutureTask<>(() -> { System.out.println(Thread.currentThread().getName() + ": task2"); Thread.sleep(200); return "result2"; }); FutureTask<String> task3 = new FutureTask<>(() -> { System.out.println(Thread.currentThread().getName() + ": task3"); Thread.sleep(200); return "result3"; });
executor.submit(task1); executor.submit(task2); executor.submit(task3);
String result1 = task1.get(); String result2 = task2.get(); String result3 = task3.get();
System.out.println(result1); System.out.println(result2); System.out.println(result3); }
|
输出:
1 2 3 4 5 6
| 任务2执行中 任务1执行中 任务3执行中 result1 result2 result3
|
缺点:
由于调用 Future.get() 操作会阻塞当前线程,这样假如 task1 任务执行的时间很长,那么即使其他 tasks 都已执行完毕,我们也得等待 task 执行完,从而不能优先处理最先完成的任务结果。
CountDownLatch
CountDownLatch
,通常称之为闭锁,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| @Test public void countDownLatchTest() throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(5);
Map<String, String> taskResults = new HashMap<>();
CountDownLatch countDownLatch = new CountDownLatch(3);
Runnable task1 = () -> { try { System.out.println("任务1执行中"); Thread.sleep(200); taskResults.put("task1", "result1"); } catch (InterruptedException e) { e.printStackTrace(); }
countDownLatch.countDown(); };
Runnable task2 = () -> { try { System.out.println("任务2执行中"); Thread.sleep(200); taskResults.put("task2", "result2"); } catch (InterruptedException e) { e.printStackTrace(); }
countDownLatch.countDown(); };
Runnable task3 = () -> { try { System.out.println("任务3执行中"); Thread.sleep(200); taskResults.put("task3", "result3"); } catch (InterruptedException e) { e.printStackTrace(); }
countDownLatch.countDown(); };
executor.execute(task1); executor.execute(task2); executor.execute(task3);
countDownLatch.await();
System.out.println("所有任务已执行结束,任务结果:" + taskResults.toString()); }
|
输出:
1 2 3 4
| 任务2执行中 任务1执行中 任务3执行中 所有任务已执行结束,任务结果:{task1=result1, task2=result2, task3=result3}
|
CyclicBarrier
CyclicBarrier
,即可重用屏障/栅栏,它允许一组线程彼此等待到达一个共同的障碍点,并可以设置线程都到达障碍点后要执行的命令。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| @Test public void cyclicBarrierTest() throws InterruptedException, BrokenBarrierException { Map<String, String> taskResults = new HashMap<>();
CyclicBarrier barrier = new CyclicBarrier(4, () -> { System.out.println("所有任务已执行结束,任务结果:" + taskResults.toString()); });
Runnable task1 = () -> { try { System.out.println("任务1执行中"); Thread.sleep(200); taskResults.put("task1", "result1"); } catch (InterruptedException e) { e.printStackTrace(); }
try { barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } };
Runnable task2 = () -> { try { System.out.println("任务2执行中"); Thread.sleep(200); taskResults.put("task2", "result2"); } catch (InterruptedException e) { e.printStackTrace(); }
try { barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } };
Runnable task3 = () -> { try { System.out.println("任务3执行中"); Thread.sleep(200); taskResults.put("task3", "result3"); } catch (InterruptedException e) { e.printStackTrace(); }
try { barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } };
new Thread(task1).start(); new Thread(task2).start(); new Thread(task3).start();
barrier.await(); }
|
输出:
1 2 3 4
| 任务2执行中 任务1执行中 任务3执行中 所有任务已执行结束,任务结果:{task1=result1, task2=result2, task3=result3}
|
CompletionService
CompletionService
适用于异步获取并行任务执行结果。
使用Future
和Callable
可以获取线程执行结果,但获取方式确是阻塞的,根据添加到线程池中的线程顺序,依次获取,获取不到就阻塞。CompletionService
采用轮询的方式,可以做到异步非阻塞获取执行结果。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| @Test public void completionServiceTest() { ExecutorService executor = Executors.newFixedThreadPool(5); CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
Callable<String> task1 = () -> { System.out.println("任务1执行中"); Thread.sleep(200); return "result1"; };
Callable<String> task2 = () -> { System.out.println("任务2执行中"); Thread.sleep(200); return "result2"; };
Callable<String> task3 = () -> { System.out.println("任务3执行中"); Thread.sleep(200); return "result3"; };
completionService.submit(task1); completionService.submit(task2); completionService.submit(task3);
IntStream.range(0, 3).forEach(index -> { try { Future<String> future = completionService.take(); System.out.println(future.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); }
|
输出:
1 2 3 4 5 6
| 任务3执行中 任务1执行中 任务2执行中 result3 result2 result1
|
CompletableFuture
在Java8中,CompletableFuture
提供了非常强大的Future
的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture
的方法。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
| @Test public void completableFutureTest() {
ExecutorService executor = Executors.newFixedThreadPool(10);
Map<String, String> results = new HashMap<>();
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { System.out.println("任务1执行中"); Thread.sleep(200); System.out.println("任务1执行完毕"); } catch (InterruptedException e) { e.printStackTrace(); } return "result1"; }, executor ).whenComplete((v, e) -> results.put("task1", v));
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { System.out.println("任务2执行中"); Thread.sleep(200); System.out.println("任务2执行完毕"); } catch (InterruptedException e) { e.printStackTrace(); } return "result2"; }, executor ).whenComplete((v, e) -> results.put("task2", v));
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { try { System.out.println("任务3执行中"); Thread.sleep(200); System.out.println("任务3执行完毕"); } catch (InterruptedException e) { e.printStackTrace(); } return "result3"; }, executor ).whenComplete((v, e) -> results.put("task3", v));
CompletableFuture.allOf(future1, future2, future3) .thenRun(() -> System.out.println("所有任务已执行结束,任务结果:" + results.toString())) .join(); }
|
输出:
1 2 3 4 5 6 7
| 任务2执行中 任务1执行中 任务3执行中 任务2执行完毕 任务1执行完毕 任务3执行完毕 所有任务已执行结束,任务结果:[result1, result2, result3]
|
ParallelStream
对于计算密集型任务,可以使用ParallelStream
,其底层使用Fork/Join框架实现。
示例:
1 2 3 4 5 6 7 8 9 10 11 12
| @Test public void forkJoinTest() { IntStream.range(0, 10).parallel().forEach((index) -> System.out.println(String.format("任务%s执行中", index)));
List<String> results = IntStream.range(0, 10).parallel() .mapToObj((index) -> { System.out.println(String.format("任务%s执行中", index)); return String.format("result%s", index); }).collect(Collectors.toList());
System.out.println("所有任务已执行结束,任务结果:" + results.toString()); }
|
输出:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| 任务6执行中 任务5执行中 任务2执行中 任务4执行中 任务1执行中 任务3执行中 任务0执行中 任务8执行中 任务7执行中 任务9执行中 任务6执行中 任务1执行中 任务2执行中 任务4执行中 任务3执行中 任务8执行中 任务0执行中 任务7执行中 任务5执行中 任务9执行中 所有任务已执行结束,任务结果:[result0, result1, result2, result3, result4, result5, result6, result7, result8, result9]
|
缺点:
默认情况下所有的并行流计算都共享一个 ForkJoinPool,这个共享的 ForkJoinPool 默认的线程数是 CPU 的核数;如果所有的并行流计算都是 CPU 密集型计算的话,完全没有问题,但是如果存在 I/O 密集型的并行流计算,那么很可能会因为一个很慢的 I/O 计算而拖慢整个系统的性能。所以建议用不同的 ForkJoinPool 执行不同类型的计算任务。