CountDownLatch
,通常称之为闭锁,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。
UML
使用示例 并发任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 @Test public void tasks () throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(5 ); Map<String, String> taskResults = new HashMap<>(); CountDownLatch countDownLatch = new CountDownLatch(3 ); Runnable task1 = () -> { try { System.out.println("任务1执行中" ); Thread.sleep(200 ); taskResults.put("task1" , "result1" ); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.countDown(); }; Runnable task2 = () -> { try { System.out.println("任务2执行中" ); Thread.sleep(200 ); taskResults.put("task2" , "result2" ); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.countDown(); }; Runnable task3 = () -> { try { System.out.println("任务3执行中" ); Thread.sleep(200 ); taskResults.put("task3" , "result3" ); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.countDown(); }; executor.execute(task1); executor.execute(task2); executor.execute(task3); countDownLatch.await(); System.out.println("所有任务已执行结束,任务结果:" + taskResults.toString()); }
1 2 3 4 任务2执行中 任务1执行中 任务3执行中 所有任务已执行结束,任务结果:{task1=result1, task2=result2, task3=result3}
赛跑 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @Test public void race () throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(10 ); CountDownLatch begin = new CountDownLatch(1 ); CountDownLatch end = new CountDownLatch(10 ); IntStream.range(0 , 10 ).forEach(index -> { executor.execute(() -> { try { begin.await(); System.out.println(String.format("选手:%s正在赛跑中" , index)); } catch (InterruptedException e) { e.printStackTrace(); } try { Thread.sleep(new Random().nextInt(10 ) * 300 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(String.format("选手:%s到达终点" , index)); end.countDown(); }); }); System.out.println("预备,跑!" ); begin.countDown(); end.await(); System.out.println("比赛结束!" ); }
实现原理 Semaphore
是通过内部聚合AbstractQueuedSynchronizer
的子类来Sync
实现并发控制的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L ; Sync(int count) { setState(count); } int getCount () { return getState(); } protected int tryAcquireShared (int acquires) { return (getState() == 0 ) ? 1 : -1 ; } protected boolean tryReleaseShared (int releases) { for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c-1 ; if (compareAndSetState(c, nextc)) return nextc == 0 ; } } } private final Sync sync; public CountDownLatch (int count) { if (count < 0 ) throw new IllegalArgumentException("count < 0" ); this .sync = new Sync(count); } public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public boolean await (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1 , unit.toNanos(timeout)); } public void countDown () { sync.releaseShared(1 ); } public long getCount () { return sync.getCount(); } public String toString () { return super .toString() + "[Count = " + sync.getCount() + "]" ; } }