队列同步器(AbstractQueuedSynchronizer),是用来构建锁或其它同步组件的基础框架。它既支持独占式的获取同步状态,也支持共享式的获取同步状态,像可重入锁(ReentrantLock)、可重入读写锁(ReentrantReadWriteLock)、计数器闭锁(CountDownLatch)、信号量(Semaphore)等常用同步组件都是基于队列同步器实现的。 队列同步器是基于模板方法模式设计的,也就是说,使用者需要继承同步器并重写指定的方法,随后将同步器聚合 在自定义的同步组件中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。
重写同步器指定的方法时,需要使用同步器提供的3个方法来访问或修改同步状态。
getState(): 获取同步状态
setState(int newState): 设置当前同步状态
compareAndSetState(int expect, int update): 使用CAS设置当前状态,该方法能够保证状态设置的原子性
同步器提供了以下5个可重写的方法:
boolean tryAcquire(int arg) 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再及进行CAS设置同步状态
boolean tryRelease(int arg) 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态
int tryAcquireShared(int arg) 共享式获取同步状态,返回大于等于0的值,表示获取成功,反之,获取失败
boolean tryReleaseShared(int arg) 共享式释放同步状态
boolean isHeldExclusively() 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占
实现自定义同步组件时,将会调用以下同步器提供的模板方法:
void acquire(int arg)
void acquireInterruptibly(int arg)
boolean tryAcquireNanos(int arg, long nanos)
void acquireShared(int arg)
void acquireSharedInterruptibly(int arg)
boolean tryAcquireSharedNanos(int arg, long nanos)
boolean release(int arg)
boolean releaseShared(int arg)
Collection getQueuedThreads()
下面通过实现一个独占锁和一个同步工具类TwinsLock
:同一时刻只允许至多两个线程的访问,来演示如何使用队列同步器。
独占锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 public class Mutex implements Lock , java .io .Serializable { private final Sync sync; public Mutex () { sync = new Sync(); } private static class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire (int arg) { assert arg == 1 ; if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } @Override protected boolean tryRelease (int arg) { assert arg == 1 ; if (getState() == 0 ) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null ); setState(0 ); return true ; } @Override protected boolean isHeldExclusively () { return getState() == 1 ; } final ConditionObject newCondition () { return new ConditionObject(); } private void readObject (ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); setState(0 ); } } @Override public void lock () { sync.acquire(1 ); } @Override public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); } @Override public boolean tryLock () { return sync.tryAcquire(1 ); } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1 , unit.toNanos(time)); } @Override public void unlock () { sync.release(1 ); } @Override public Condition newCondition () { return sync.newCondition(); } public boolean isLocked () { return sync.isHeldExclusively(); } public boolean hasQueuedThreads () { return sync.hasQueuedThreads(); } public static void main (String[] args) { Mutex mutex = new Mutex(); mutex.lock(); try { } finally { mutex.unlock(); } } }
TwinsLock:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 public class TwinsLock implements Lock , java .io .Serializable { private final Sync sync; public TwinsLock () { sync = new Sync(2 ); } private static class Sync extends AbstractQueuedSynchronizer { public Sync (int count) { setState(count); } @Override protected int tryAcquireShared (int arg) { for (; ; ) { int c = getState(); int nextc = c - arg; if (nextc < 0 || compareAndSetState(c, nextc)) { return nextc; } } } @Override protected boolean tryReleaseShared (int arg) { for (; ; ) { int c = getState(); int nextc = c + arg; if (compareAndSetState(c, nextc)) { return true ; } } } final ConditionObject newCondition () { return new ConditionObject(); } } @Override public void lock () { sync.acquireShared(1 ); } @Override public void lockInterruptibly () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } @Override public boolean tryLock () { return sync.tryAcquireShared(1 ) > 0 ; } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1 , unit.toNanos(time)); } @Override public void unlock () { sync.releaseShared(1 ); } @Override public Condition newCondition () { return sync.newCondition(); } }
队列同步器的实现原理可概括为: 在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移除队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。 在释放同步状态时,同步器调用tryRelease(int arg)
方法释放同步状态,然后唤醒头节点中的后继节点。 同步状态的获取与释放都会同步更新状态变量state
的值,该值是volatile
类型,利用了volatile相关的Happens-Before规则 。