人的知识就好比一个圆圈,圆圈里面是已知的,圆圈外面是未知的。你知道得越多,圆圈也就越大,你不知道的也就越多。

0%

并发工具类--抽象队列同步器

队列同步器(AbstractQueuedSynchronizer),是用来构建锁或其它同步组件的基础框架。它既支持独占式的获取同步状态,也支持共享式的获取同步状态,像可重入锁(ReentrantLock)、可重入读写锁(ReentrantReadWriteLock)、计数器闭锁(CountDownLatch)、信号量(Semaphore)等常用同步组件都是基于队列同步器实现的。
队列同步器是基于模板方法模式设计的,也就是说,使用者需要继承同步器并重写指定的方法,随后将同步器聚合在自定义的同步组件中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。

重写同步器指定的方法时,需要使用同步器提供的3个方法来访问或修改同步状态。

  • getState(): 获取同步状态
  • setState(int newState): 设置当前同步状态
  • compareAndSetState(int expect, int update): 使用CAS设置当前状态,该方法能够保证状态设置的原子性

同步器提供了以下5个可重写的方法:

  • boolean tryAcquire(int arg)
    独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再及进行CAS设置同步状态
  • boolean tryRelease(int arg)
    独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态
  • int tryAcquireShared(int arg)
    共享式获取同步状态,返回大于等于0的值,表示获取成功,反之,获取失败
  • boolean tryReleaseShared(int arg)
    共享式释放同步状态
  • boolean isHeldExclusively()
    当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占

实现自定义同步组件时,将会调用以下同步器提供的模板方法:

  • void acquire(int arg)
  • void acquireInterruptibly(int arg)
  • boolean tryAcquireNanos(int arg, long nanos)
  • void acquireShared(int arg)
  • void acquireSharedInterruptibly(int arg)
  • boolean tryAcquireSharedNanos(int arg, long nanos)
  • boolean release(int arg)
  • boolean releaseShared(int arg)
  • Collection getQueuedThreads()

下面通过实现一个独占锁和一个同步工具类TwinsLock:同一时刻只允许至多两个线程的访问,来演示如何使用队列同步器。

独占锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
public class Mutex implements Lock, java.io.Serializable {
private final Sync sync;

public Mutex() {
sync = new Sync();
}

private static class Sync extends AbstractQueuedSynchronizer {

/**
* 获取锁
*/
@Override
protected boolean tryAcquire(int arg) {
assert arg == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

/**
* 释放锁
*/
@Override
protected boolean tryRelease(int arg) {
assert arg == 1; // Otherwise unused
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}

/**
* 是否处于独占状态
*/
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}

/**
* 返回一个条件对象,每个条件对象都包含一个等待队列
*/
final ConditionObject newCondition() {
return new ConditionObject();
}

/**
* 反序列化时,重置state为未锁定状态
*/
private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}

@Override
public void lock() {
sync.acquire(1);
}

@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}

@Override
public void unlock() {
sync.release(1);
}

@Override
public Condition newCondition() {
return sync.newCondition();
}

public boolean isLocked() {
return sync.isHeldExclusively();
}

public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}

public static void main(String[] args) {
Mutex mutex = new Mutex();
mutex.lock();

try {
// TODO
} finally {
mutex.unlock();
}
}
}

TwinsLock:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class TwinsLock implements Lock, java.io.Serializable {
private final Sync sync;

public TwinsLock() {
sync = new Sync(2);
}

private static class Sync extends AbstractQueuedSynchronizer {
public Sync(int count) {
setState(count);
}

/**
* 获取锁
*/
@Override
protected int tryAcquireShared(int arg) {
for (; ; ) {
int c = getState();
int nextc = c - arg;
// 小于0时直接返回,表示获取锁失败;大于0则调用CAS
if (nextc < 0 || compareAndSetState(c, nextc)) {
return nextc;
}
}
}

/**
* 释放锁
*/
@Override
protected boolean tryReleaseShared(int arg) {
for (; ; ) {
int c = getState();
int nextc = c + arg;
if (compareAndSetState(c, nextc)) {
return true;
}
}
}

/**
* 返回一个条件对象,每个条件对象都包含一个等待队列
*/
final ConditionObject newCondition() {
return new ConditionObject();
}
}

@Override
public void lock() {
sync.acquireShared(1);
}

@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

@Override
public boolean tryLock() {
return sync.tryAcquireShared(1) > 0;
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
}

@Override
public void unlock() {
sync.releaseShared(1);
}

@Override
public Condition newCondition() {
return sync.newCondition();
}
}

队列同步器的实现原理可概括为:
在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移除队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。
在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点中的后继节点。
同步状态的获取与释放都会同步更新状态变量state的值,该值是volatile类型,利用了volatile相关的Happens-Before规则

小礼物走一走,来 Github 关注我