人的知识就好比一个圆圈,圆圈里面是已知的,圆圈外面是未知的。你知道得越多,圆圈也就越大,你不知道的也就越多。

0%

CyclicBarrier,即可重用屏障/栅栏,它允许一组线程彼此等待到达一个共同的障碍点,并可以设置线程都到达障碍点后要执行的命令。CyclicBarrier在包含固定大小的线程的程序中非常有用,这些线程有时必须彼此等待。这个屏障被称为循环,因为它可以在释放等待的线程之后重用。

UML

CyclicBarrier

示例

并发任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/**
* 模拟并发任务
*/
@Test
public void tasks() throws InterruptedException, BrokenBarrierException {
// 存放任务及其执行结果映射
Map<String, String> taskResults = new HashMap<>();

// 创建栅栏:等待指定数目的任务都执行完后,输出所有任务执行结果
// 注意:还需要考虑主线程
CyclicBarrier barrier = new CyclicBarrier(4, () -> {
System.out.println("所有任务已执行结束,任务结果:" + taskResults.toString());
});

// 模拟任务1
Runnable task1 = () -> {
// 模拟任务执行
try {
System.out.println("任务1执行中");
Thread.sleep(200);
taskResults.put("task1", "result1");
} catch (InterruptedException e) {
e.printStackTrace();
}

try {
// 执行完成,等待屏障
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
};

// 模拟任务2
Runnable task2 = () -> {
// 模拟任务执行
try {
System.out.println("任务2执行中");
Thread.sleep(200);
taskResults.put("task2", "result2");
} catch (InterruptedException e) {
e.printStackTrace();
}

try {
// 执行完成,等待屏障
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
};

// 模拟任务3
Runnable task3 = () -> {
// 模拟任务执行
try {
System.out.println("任务3执行中");
Thread.sleep(200);
taskResults.put("task3", "result3");
} catch (InterruptedException e) {
e.printStackTrace();
}

try {
// 执行完成,等待屏障
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
};

// 提交任务1
new Thread(task1).start();
// 提交任务2
new Thread(task2).start();
// 提交任务3
new Thread(task3).start();

// 主线程等待屏障
barrier.await();
}
1
2
3
4
任务2执行中
任务1执行中
任务3执行中
所有任务已执行结束,任务结果:{task1=result1, task2=result2, task3=result3}

源码

CyclicBarrier是通过内部聚合ReentrantLock和等待队列Condition来实现并发控制的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
// 线程个数
this.parties = parties;
// 还未到达屏障点的线程个数
this.count = parties;
// 所有线程到达屏障点后,要执行的命令
this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
this(parties, null);
}

public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 利用可重入锁,保证同时只有一个线程能进入
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

// 每次调用该方法,count自减
int index = --count;
// 等于0时,表示所有线程都已到达屏障
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 如果构造函数有传入command,就执行
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 唤醒所有等待线程,并重置
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
// 所有线程没有全部到达屏障时
for (;;) {
try {
// 如果未设置超时,调用Contion.await(),将线程放入等待队列
if (!timed)
trip.await();
//
else if (nanos > 0L)
// 如果设置了超时,调用Contion.awaitNanos(nanos),将线程放入等待队列
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

对比 CountDownLatch

  • CountDownLatch主要用来解决一个线程等待多个线程的场景,而CyclicBarrier是一组线程之间互相等待
  • CountDownLatch的计数器是不能循环利用的,也就是说一旦计数器减到0,再有线程调用await(),该线程会直接通过。但CyclicBarrier的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到0会自动重置到最开始设置的初始值。
  • CyclicBarrier还可以设置回调函数,可以说是功能丰富。

Semaphore,即信号量,是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

模型

信号量模型可以简单概括为:一个计数器,一个等待队列,三个方法。在信号量模型里,计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问它们,这三个方法分别是:init()down()up()。我们可以结合下图来形象化地理解:
信号量模型图

这三个方法详细的语义具体如下所示:

  • init():设置计数器的初始值。
  • down():计数器的值减 1;如果此时计数器的值小于0,则当前线程将被阻塞,否则当前线程可以继续执行。
  • up():计数器的值加 1;如果此时计数器的值小于或者等于0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。

这里提到的 init()、down() 和 up() 三个方法都是原子性的,并且这个原子性是由信号量模型的实现方保证的。

UML

在Java SDK里面,信号量模型是由Semaphore实现的。
Semaphore

使用示例

获取数据库连接

假设有以下需求:要读取几万个文件的数据,因为都是IO密集型任务,因此可以启动几十个线程并发地读取,但是如果读取到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时就必须控制只有10个线程能同时获取到数据库连接,否则会报错提示无法获取数据库连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 模拟获取数据库连接后持久化数据
*/
@Test
public void test1() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(30);

// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(30);
// 创建信号量
Semaphore semaphore = new Semaphore(10);

IntStream.range(0, 30).forEach(index -> {
executor.execute(() -> {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("save data: " + index);

semaphore.release();

countDownLatch.countDown();
});
});

countDownLatch.await();
}

资源池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 基于Semaphore实现资源池
*/
class Pool {
private static final int MAX_AVAILABLE = 10;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}

public void putItem(Object x) {
if (markAsUnused(x)) {
available.release();
}
}

protected Object[] items = new String[]{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"};
protected boolean[] used = new boolean[MAX_AVAILABLE];

protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null;
}

protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else {
return false;
}
}
}
return false;
}
}

实现原理

Semaphore是通过内部聚合AbstractQueuedSynchronizer的子类来Sync实现并发控制的,并且它同时支持公平或非公平锁的获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* 基类,有公平和非公平两个子类
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;

Sync(int permits) {
setState(permits);
}

final int getPermits() {
return getState();
}

/**
* 非公平获取
*/
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}

final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}

final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}

/**
* 非公平版本
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}

/**
* 公平版本
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
for (;;) {
// 首节点才有机会执行
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

CountDownLatch,通常称之为闭锁,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。

UML

CountDownLatch

使用示例

并发任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* 模拟并发任务
*/
@Test
public void tasks() throws InterruptedException {
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(5);

// 存放任务及其执行结果映射
Map<String, String> taskResults = new HashMap<>();

// 创建闭锁:当count减为0时,线程才会继续往下执行
CountDownLatch countDownLatch = new CountDownLatch(3);

// 模拟任务1
Runnable task1 = () -> {
// 模拟任务执行
try {
System.out.println("任务1执行中");
Thread.sleep(200);
taskResults.put("task1", "result1");
} catch (InterruptedException e) {
e.printStackTrace();
}

countDownLatch.countDown();
};

// 模拟任务2
Runnable task2 = () -> {
// 模拟任务执行
try {
System.out.println("任务2执行中");
Thread.sleep(200);
taskResults.put("task2", "result2");
} catch (InterruptedException e) {
e.printStackTrace();
}

countDownLatch.countDown();
};

// 模拟任务3
Runnable task3 = () -> {
// 模拟任务执行
try {
System.out.println("任务3执行中");
Thread.sleep(200);
taskResults.put("task3", "result3");
} catch (InterruptedException e) {
e.printStackTrace();
}

countDownLatch.countDown();
};

// 提交任务1
executor.execute(task1);
// 提交任务2
executor.execute(task2);
// 提交任务3
executor.execute(task3);

// 等待所有任务执行完毕
countDownLatch.await();

System.out.println("所有任务已执行结束,任务结果:" + taskResults.toString());
}
1
2
3
4
任务2执行中
任务1执行中
任务3执行中
所有任务已执行结束,任务结果:{task1=result1, task2=result2, task3=result3}

赛跑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 模拟赛跑,发令枪响后同时开跑,所有人到达后比赛结束
*/
@Test
public void race() throws InterruptedException {
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(10);

// 创建闭锁begin:控制线程同时开始
CountDownLatch begin = new CountDownLatch(1);
// 创建闭锁end:控制当线程都结束后,主线程才继续执行
CountDownLatch end = new CountDownLatch(10);

IntStream.range(0, 10).forEach(index -> {
executor.execute(() -> {
// 选手员等待发令枪
try {
begin.await();
System.out.println(String.format("选手:%s正在赛跑中", index));
} catch (InterruptedException e) {
e.printStackTrace();
}

// 选手赛跑耗时
try {
Thread.sleep(new Random().nextInt(10) * 300);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(String.format("选手:%s到达终点", index));

end.countDown();
});
});

System.out.println("预备,跑!");
// 发令枪响,所有选手开跑
begin.countDown();

// 等待所有选手到达终点
end.await();

System.out.println("比赛结束!");
}

实现原理

Semaphore是通过内部聚合AbstractQueuedSynchronizer的子类来Sync实现并发控制的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

private final Sync sync;

public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void countDown() {
sync.releaseShared(1);
}

public long getCount() {
return sync.getCount();
}

public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}

思想

动态规划算法通常用于求解具有某种最优性质的问题。在这类问题中,可能会有许多可行解。每一个解都对应于一个值,我们希望找到具有最优值的解。
动态规划算法与分治法类似,其基本思想也是将待求解问题分解成若干个子问题,先求解子问题,然后从这些子问题的解得到原问题的解。与分治法不同的是,适合于用动态规划求解的问题,经分解得到子问题往往不是互相独立的。若用分治法来解这类问题,则分解得到的子问题数目太多,有些子问题被重复计算了很多次。如果我们能够保存已解决的子问题的答案,而在需要时再找出已求得的答案,这样就可以避免大量的重复计算,节省时间。
我们可以用一个表来记录所有已解的子问题的答案。不管该子问题以后是否被用到,只要它被计算过,就将其结果填入表中。这就是动态规划法的基本思路。具体的动态规划算法多种多样,但它们具有相同的填表格式。

动态规划程序设计是对解最优化问题的一种途径、一种方法,而不是一种特殊算法。不像搜索或数值计算那样,具有一个标准的数学表达式和明确清晰的解题方法。动态规划程序设计往往是针对一种最优化问题,由于各种问题的性质不同,确定最优解的条件也互不相同,因而动态规划的设计方法对不同的问题,有各具特色的解题方法,而不存在一种万能的动态规划算法,可以解决各类最优化问题。因此读者在学习时,除了要对基本概念和方法正确理解外,必须具体问题具体分析处理,以丰富的想象力去建立模型,用创造性的技巧去求解。我们也可以通过对若干有代表性的问题的动态规划算法进行分析、讨论,逐渐学会并掌握这一设计方法。

概念

一个模型三个特征:

  • 多阶段决策最优解模型
    如果一类活动过程可以分为若干个互相联系的阶段,在每一个阶段都需作出决策(采取措施),一个阶段的决策确定以后,常常影响到下一个阶段的决策,从而就完全确定了一个过程的活动路线,则称它为多阶段决策问题。
    各个阶段的决策构成一个决策序列,称为一个策略。每一个阶段都有若干个决策可供选择,因而就有许多策略供我们选取,对应于一个策略可以确定活动的效果,这个效果可以用数量来确定。策略不同,效果也不同,多阶段决策问题,就是要在可以选择的那些策略中间,选取一个最优策略,使在预定的标准下达到最好的效果.
  • 最优子结构
    一个最优化策略具有这样的性质,不论过去状态和决策如何,对前面的决策所形成的状态而言,余下的诸决策必须构成最优策略。简而言之,一个最优化策略的子策略总是最优的。一个问题满足最优化原理又称其具有最优子结构性质。
  • 无后效性
    将各阶段按照一定的次序排列好之后,对于某个给定的阶段状态,它以前各阶段的状态无法直接影响它未来的决策,而只能通过当前的这个状态。换句话说,每个状态都是过去历史的一个完整总结。这就是无后向性,又称为无后效性。
  • 重复子问题
    不同的决策序列,到达某个相同的阶段时,可能会产生重复的状态。动态规划将原来具有指数级时间复杂度的搜索算法改进成了具有多项式时间复杂度的算法。其中的关键在于解决冗余,这是动态规划算法的根本目的。动态规划实质上是一种以空间换时间的技术,它在实现的过程中,不得不存储产生过程中的各种状态,所以它的空间复杂度要大于其它的算法。

思路

  • 状态转移表法
    状态表一般都是二维的,所以你可以把它想象成二维数组。其中,每个状态包含三个变量,行、列、数组值。我们根据决策的先后过程,从前往后,根据递推关系,分阶段填充状态表中的每个状态。最后,我们将这个递推填表的过程,翻译成代码,就是动态规划代码了。
    思路大致可以概括为:回溯算法实现 - 定义状态 - 画递归树 - 找重复子问题- 画状态转移表 - 根据递推关系填表 - 将填表过程翻译成代码。

  • 状态转移方程法
    状态转移方程法有点类似递归的解题思路。我们需要分析,某个问题如何通过子问题来递归求解,也就是所谓的最优子结构。据最优子结构,写出递归公式,也就是所谓的状态转移方程。状态转移方程的一般形式:
    一般形式: U:状态; X:策略
    顺推:f[Uk]=opt{f[Uk-1]+L[Uk-1,Xk-1]} 其中, L[Uk-1,Xk-1]: 状态Uk-1通过策略Xk-1到达状态Uk的费用,初始f[U1];结果:f[Un]。
    倒推:
      f[Uk]=opt{f[Uk+1]+L[Uk,Xk]}
      L[Uk,Xk]: 状态Uk通过策略Xk到达状态Uk+1 的费用
      初始f[Un];结果:f(U1)
    思路大致可以概括为:找最优子结构 - 写状态转移方程 - 将状态转移方程翻译成代码。

过程

  1. 确定问题的决策对象
  2. 对决策过程划分阶段
  3. 对各阶段确定状态变量
  4. 根据状态变量确定费用函数和目标函数
  5. 建立各阶段状态变量的转移过程,确定状态转移方程

分类

  • 线性动规:拦截导弹、合唱队行、挖地雷、建学校、剑客决斗等
  • 区域动规:石子合并、加分二叉树、统计单词个数、炮兵布阵等
  • 树形动规:贪吃的九头龙、二分查找树、聚会的欢乐、数字三角形等
  • 背包问题:01背包问题、完全背包问题、分组背包问题、二维背包、装箱问题、挤牛奶(同济ACM第1132题)等

实际应用

  • 最短路径
  • 最长公共子串长度

示例应用

0-1背包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
/**
* 计算背包最大承载重量
* 使用二维数组,存储各阶段的状态集合
*
* @param items 物品数组
* @param n 物品个数
* @param w 背包可承载重量
* @return 实际背包承载重量
*/
public int compute(int[] items, int n, int w) {
// 二维数组,初始值都为false
boolean[][] states = new boolean[n][w + 1];
// 第一行的数据要特殊处理,可以利用哨兵优化
states[0][0] = true;

// 如果第一个物品的重量小于w,就放进背包
if (items[0] <= w) {
states[0][items[0]] = true;
}

// 动态规划状态转移
for (int i = 1; i < n; ++i) {
// 不把第i个物品放入背包
for (int j = 0; j <= w; ++j) {
if (states[i - 1][j]) {
// 此时的状态等于前一个阶段的状态
states[i][j] = states[i - 1][j];
}
}

// 把第i个物品放入背包
// j <= w - weight[i]:保证背包重量不会超出
for (int j = 0; j <= w - items[i]; ++j) {
if (states[i - 1][j]) {
states[i][j + items[i]] = true;
}
}
}

// 输出结果
for (int i = w; i >= 0; --i) {
if (states[n - 1][i]) {
return i;
}
}

return 0;
}

/**
* 计算背包最大承载重量
* 使用一维数组,只保存当前阶段的状态集合
*
* @param items 物品数组
* @param n 物品个数
* @param w 背包可承载重量
* @return 实际背包承载重量
*/
public int compute2(int[] items, int n, int w) {
// 一维数组,初始值都为false
boolean[] states = new boolean[w + 1];
// 第一行的数据要特殊处理,可以利用哨兵优化
states[0] = true;

// 如果第一个物品的重量小于w,就放进背包
if (items[0] <= w) {
states[items[0]] = true;
}

// 动态规划
for (int i = 1; i < n; ++i) {
// 把第 i 个物品放入背包
// j从大到小循环,保证不会重复
for (int j = w - items[i]; j >= 0; --j) {
if (states[j]) {
states[j + items[i]] = true;
}
}
}

// 输出结果
for (int i = w; i >= 0; --i) {
if (states[i]) {
return i;
}
}
return 0;
}

/**
* 计算背包最大承载重量,同时保证相同重量下价值最大
* 使用二维数组,存储各阶段的状态集合
*
* @param items 物品重量数组
* @param values 物品价格数组
* @param n 物品个数
* @param w 背包可承载重量
* @return 实际背包承载重量
*/
public int compute3(int[] items, int[] values, int n, int w) {
int[][] states = new int[n][w+1];

// 初始化 states
for (int i = 0; i < n; ++i) {
for (int j = 0; j < w+1; ++j) {
states[i][j] = -1;
}
}

// 第一行的数据要特殊处理
states[0][0] = 0;

// 如果第一个物品的重量小于w,就放进背包
if (items[0] <= w) {
states[0][items[0]] = values[0];
}

// 动态规划,状态转移
for (int i = 1; i < n; ++i) {
// 不选择第i个物品
for (int j = 0; j <= w; ++j) {
if (states[i-1][j] >= 0) {
// 此时的状态等于前一个阶段的状态
states[i][j] = states[i-1][j];
}
}

// 选择第i个物品
for (int j = 0; j <= w-items[i]; ++j) {
if (states[i-1][j] >= 0) {
int v = states[i-1][j] + values[i];
// 只存储最大值
if (v > states[i][j+items[i]]) {
states[i][j+items[i]] = v;
}
}
}
}

// 找出最大值
int maxvalue = -1;
for (int j = 0; j <= w; ++j) {
if (states[n-1][j] > maxvalue) {
maxvalue = states[n-1][j];
}
}

return maxvalue;
}

最短路径

假设我们有一个 n 乘以 n 的矩阵 w[n][n]。矩阵存储的都是正整数。棋子起始位置在左上角,终止位置在右下角。我们将棋子从左上角移动到右下角。每次只能向右或者向下移动一位。从左上角到右下角,会有很多不同的路径可以走。我们把每条路径经过的数字加起来看作路径的长度。那从左上角移动到右下角的最短短路径长度是多少呢?

  • 回溯算法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    private int minDist = Integer.MAX_VALUE;
    public void minDistBT(int i, int j, int dist, int[][] w, int n) {
    // 到达了 n-1, n-1 这个位置了
    if (i == n && j == n) {
    if (dist < minDist) {
    minDist = dist;
    }
    return;
    }
    if (i < n) {
    // 往下走,更新 i=i+1, j=j
    minDistBT(i + 1, j, dist+w[i][j], w, n);
    }
    if (j < n) {
    // 往右走,更新 i=i, j=j+1
    minDistBT(i, j+1, dist+w[i][j], w, n);
    }
    }
  • 动态规划-状态转移表法
    从前往后计算各阶段状态:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public int minDistDP(int[][] matrix, int n) {
    // 定义二维状态表
    int[][] states = new int[n][n];

    // 初始化states的第一行数据
    int sum = 0;
    for (int j = 0; j < n; ++j) {
    sum += matrix[0][j];
    states[0][j] = sum;
    }

    // 初始化states的第一列数据
    sum = 0;
    for (int i = 0; i < n; ++i) {
    sum += matrix[i][0];
    states[i][0] = sum;
    }

    for (int i = 1; i < n; ++i) {
    for (int j = 1; j < n; ++j) {
    // 状态方程
    states[i][j] = matrix[i][j] + Math.min(states[i][j-1], states[i-1][j]);
    }
    }
    return states[n-1][n-1];
    }
  • 动态规划-状态转移方程法
    从后往前迭代递推前阶段的状态:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    private int[][] matrix = {{1, 3, 5, 9}, {2, 1, 3, 4}, {5, 2, 6, 7}, {6, 8, 4, 3}};
    private int[][] mem = new int[4][4];

    public int minDist(int i, int j) {
    // 终止条件
    if (i == 0 && j == 0) {
    return matrix[0][0];
    }

    // 如果计算过,直接返回
    if (mem[i][j] > 0) {
    return mem[i][j];
    }

    // 可能1:由左边到当前步骤
    int minLeft = Integer.MAX_VALUE;
    if (j - 1 >= 0) {
    minLeft = minDist(i, j - 1);
    }

    // 可能2:由上面到当前步骤
    int minUp = Integer.MAX_VALUE;
    if (i - 1 >= 0) {
    minUp = minDist(i - 1, j);
    }

    // 状态方程:取两种可能的最小值
    int currMinDist = matrix[i][j] + Math.min(minLeft, minUp);
    mem[i][j] = currMinDist;
    return currMinDist;
    }

实现问题

算法实现是比较好考虑的。但有时也会遇到一些问题,而使算法难以实现。动态规划思想设计的算法从整体上来看基本都是按照得出的递推关系式进行递推,这种递推相对于计算机来说,只要设计得当,效率往往是比较高的,这样在时间上溢出的可能性不大,而相反地,动态规划需要很大的空间以存储中间产生的结果,这样可以使包含同一个子问题的所有问题共用一个子问题解,从而体现动态规划的优越性,但这是以牺牲空间为代价的,为了有效地访问已有结果,数据也不易压缩存储,因而空间矛盾是比较突出的。另一方面,动态规划的高时效性往往要通过大的测试数据体现出来(以与搜索作比较),因而,对于大规模的问题如何在基本不影响运行速度的条件下,解决空间溢出的问题,是动态规划解决问题时一个普遍会遇到的问题。
一个思考方向是尽可能少占用空间。如从结点的数据结构上考虑,仅仅存储必不可少的内容,以及数据存储范围上精打细算(按位存储、压缩存储等)。当然这要因问题而异,进行分析。另外,在实现动态规划时,一个我们经常采用的方法是用一个与结点数一样多的数组来存储每一步的决策,这对于倒推求得一种实现最优解的方法是十分方便的,而且处理速度也有一些提高。但是在内存空间紧张的情况下,我们就应该抓住问题的主要矛盾。省去这个存储决策的数组,而改成在从最优解逐级倒推时,再计算一次,选择某个可能达到这个值的上一阶段的状态,直到推出结果为止。这样做,在程序编写上比上一种做法稍微多花一点时间,运行的时效也可能会有一些(但往往很小)的下降,但却换来了很多的空间。因而这种思想在处理某些问题时,是很有意义的。

思想

回溯算法实际上一个类似枚举的搜索尝试过程,主要是在搜索尝试过程中寻找问题的解,当发现已不满足求解条件时,就“回溯”返回,尝试别的路径。回溯法是一种选优搜索法,按选优条件向前搜索,以达到目标。但当探索到某一步时,发现原先选择并不优或达不到目标,就退回一步重新选择,这种走不通就退回再走的技术为回溯法,而满足回溯条件的某个状态的点称为“回溯点”。许多复杂的,规模较大的问题都可以使用回溯法,有“通用解题方法”的美称。

过程

  1. 针对所给问题,定义问题的解空间,它至少包含问题的一个(最优)解
  2. 确定易于搜索的解空间结构,使得能用回溯法方便地搜索整个解空间
  3. 以深度优先的方式搜索解空间,并且在搜索过程中用剪枝函数避免无效搜索

示例应用

0-1背包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* 计算背包最大承载重量
*
* @param items 物品重量数组
* @param n 物品个数
* @param w 背包可承载重量
* @return 实际背包承载重量
*/
public int compute(int[] items, int n, int w) {
// 记录实际背包承载重量
int[] weight = new int[]{Integer.MIN_VALUE};
// 记录当前状态是否记录过,如果是,就直接返回
boolean[][] mem = new boolean[5][10];
// 递归计算
compute(items, n, w, weight, mem, 0, 0);
return weight[0];
}

private void compute2(int[] items, int n, int w, int[] weight, boolean[][] mem, int i, int cw) {
// cw==w 表示装满了,i==n 表示物品都考察完了
if (cw == w || i == n) {
if (cw > weight[0]) {
weight[0] = cw;
}
return;
}

// 重复状态,直接返回
if (mem[i][cw]) {
return;
}

// 记录 (i, cw) 这个状态
mem[i][cw] = true;

// 选择不装第 i 个物品
compute2(items, n, w, weight, mem, i + 1, cw);

// 如果还没有满
if (cw + items[i] <= w) {
// 选择装第 i 个物品
compute2(items, n, w, weight, mem, i + 1, cw + items[i]);
}
}

8皇后

8皇后:有一个8x8的棋盘,希望往里放8个棋子(皇后),每个棋子所在的行、列、对角线都不能有另一个棋子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
* 下标表示行, 值表示queen存储在哪一列
*/
private int[] result = new int[8];

/**
* 递归调用,从第一行开始
*
* @param row 当前考察的行数
*/
public void cal8queens(int row) {
// 8 个棋子都放置好了,打印结果
if (row == 8) {
printQueens(result);
return;
}

// 每一行都有8种放法
for (int column = 0; column < 8; ++column) {
// 有些放法不满足要求
if (isOk(row, column)) {
// 第row行的棋子放到了column列
result[row] = column;
// 考察下一行
cal8queens(row + 1);
}
}
}

/**
* 判断row行column列放置是否合适
*/
private boolean isOk(int row, int column) {
int leftUp = column - 1, rightUp = column + 1;
// 逐行往上考察每一行
for (int i = row - 1; i >= 0; --i) {
// 第i行的 column 列有棋子吗?
if (result[i] == column) {
return false;
}

// 考察左上对角线:第i行leftUp列有棋子吗?
if (leftUp >= 0) {
if (result[i] == leftUp) {
return false;
}
}

// 考察右上对角线:第i行rightUp列有棋子吗?
if (rightUp < 8) {
if (result[i] == rightUp) {
return false;
}
}
--leftUp;
++rightUp;
}
return true;
}

/**
* 打印出一个二维矩阵
*/
private void printQueens(int[] result) {
for (int row = 0; row < 8; ++row) {
for (int column = 0; column < 8; ++column) {
if (result[row] == column) {
System.out.print("Q ");
} else {
System.out.print("* ");
}
}
System.out.println();
}
System.out.println();
}

正则表达式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* 正则表达式:假定正则表达式中只包含“*”和“?”两种通配符
*
* @author cdrcool
*/
public class Pattern {
/**
* 是否匹配,默认false
*/
private boolean matched = false;
/**
* 正则表达式
*/
private char[] pattern;
/**
* 正则表达式长度
*/
private int plen;

public Pattern(char[] pattern, int plen) {
this.pattern = pattern;
this.plen = plen;
}

public boolean match(char[] text, int tlen) {
matched = false;
rmatch(0, 0, text, tlen);
return matched;
}

private void rmatch(int ti, int pj, char[] text, int tlen) {
// 如果已经匹配了,就不要继续递归了
if (matched) {
return;
}

// 正则表达式到结尾了
if (pj == plen) {
if (ti == tlen) {
// 文本串也到结尾了
matched = true;
}
return;
}

// * 匹配任意个字符
if (pattern[pj] == '*') {
for (int k = 0; k <= tlen - ti; ++k) {
rmatch(ti + k, pj + 1, text, tlen);
}
}
// ? 匹配 0 个或者 1 个字符
else if (pattern[pj] == '?') {
rmatch(ti, pj + 1, text, tlen);
rmatch(ti + 1, pj + 1, text, tlen);
}
// 纯字符匹配才行
else if (ti < tlen && pattern[pj] == text[ti]) {
rmatch(ti + 1, pj + 1, text, tlen);
}
}
}

实际应用

  • 深度优先搜索
  • 正则表达式匹配

思想

贪心算法(又称贪婪算法)是指,在对问题求解时,总是做出在当前看来是最好的选择。也就是说,不从整体最优上加以考虑,它所做出的是在某种意义上的局部最优解。
贪心算法不是对所有问题都能得到整体最优解,关键是贪心策略的选择,选择的贪心策略必须具备无后效性,即某个状态以前的过程不会影响以后的状态,只与当前状态有关。

基本要素

  • 贪心选择
    贪心选择是指所求问题的整体最优解可以通过一系列局部最优的选择,即贪心选择来达到。这是贪心算法可行的第一个基本要素,也是贪心算法与动态规划算法的主要区别。
  • 最优子结构
    当一个问题的最优解包含其子问题的最优解时,称此问题具有最优子结构性质。

过程

  1. 建立数学模型来描述问题
  2. 把求解的问题分成若干个子问题
  3. 对每一子问题求解,得到子问题的局部最优解
  4. 把子问题的解局部最优解合成原来解问题的一个解

实际应用

  • 霍夫曼编码
    霍夫曼编码不仅会考察文本中有多少个不同字符,还会考察每个字符出现的频率,根据频率的不同,选择不同长度的编码。霍夫曼编码试试图用这种不等长的编码方法,来进一步增加压缩的效率。根据贪心的思想,我们可以把出现频率比较多的字符,用稍微短一些的编码;出现频率比较少的字符,用稍微长一些的编码。
    霍夫曼编码示例

我们知道在ReadWriteLock中写锁和读锁是互斥的,也就是如果有一个线程在写共享变量的话,其他线程读共享变量都会阻塞。
StampedLock把读分为了悲观读锁和乐观读,悲观读锁就等价于ReadWriteLock的读锁,而乐观读在一个线程写共享变量时,不会被阻塞,乐观读是不加锁的。所以没锁肯定是比有锁的性能好,这样的话在大并发读情况下效率就更高了!
StampedLock的用法稍稍有点不同,在获取写锁和悲观读锁时,都会返回一个stamp,解锁时需要传入这个stamp,在乐观读时是用来验证共享变量是否被其他线程写过。

使用示例

实现Point

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class Point {
private double x, y;
private final StampedLock sl = new StampedLock();

void move(double deltaX, double deltaY) { // an exclusively locked method
// 排他锁-写锁
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}

double distanceFromOrigin() { // A read-only method
// 乐观读
long stamp = sl.tryOptimisticRead();
double currentX = x, currentY = y;
if (!sl.validate(stamp)) {
// 时间戳校验不同过,升级为悲观读锁
stamp = sl.readLock();
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}

void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) {
// 尝试将读锁升级为写锁
long ws = sl.tryConvertToWriteLock(stamp);
if (ws != 0L) {
stamp = ws;
x = newX;
y = newY;
break;
} else {
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
sl.unlock(stamp);
}
}
}

UML

ReentrantLock UML

注意事项

  • StampedLock 不支持重入
  • StampedLock 的悲观读锁、写锁都不支持条件变量
  • 如果线程阻塞在StampedLockreadLock()或者writeLock()上时,此时调用该阻塞线程的interrupt()方法,会导致CPU飙升。所以,使用StampedLock一定不要调用中断操作,如果需要支持中断功能,一定使用可中断的悲观读锁readLockInterruptibly()和写锁writeLockInterruptibly()

模板代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
final StampedLock sl = new StampedLock();
// 乐观读
long stamp = sl.tryOptimisticRead();

// 读⼊⽅法局部变量
......

// 校验 stamp
if (!sl.validate(stamp)){
// 升级为悲观读锁
stamp = sl.readLock();
try {
// 读⼊⽅法局部变量
.....
} finally {
// 释放悲观读锁
sl.unlockRead(stamp);
}
}

// 使⽤⽅法局部变量执⾏业务操作
......

long stamp = sl.writeLock();
try {
// 写共享变量
......
} finally {
sl.unlockWrite(stamp);
}

实际工作中,为了优化性能,我们经常会使用缓存,例如缓存元数据、缓存基础数据等,这就是一种典型的读多写少应用场景。缓存之所以能提升性能,一个重要的条件就是缓存的数据一定是读多写少的,例如元数据和基础数据基本上不会发生变化(写少),但是使用它们的地方却很多(读多)。
针对读多写少这种并发场景,Java SDK并发包提供了读写锁ReadWriteLock,非常容易使用,并且性能很好。

读写锁

读写锁,并不是Java语言特有的,而是一个广为使用的通用技术,所有的读写锁都遵守以下三条基本原则:

  • 允许多个线程同时读共享变量;
  • 只允许一个线程写共享变量;
  • 如果一个写线程正在执行写操作,此时禁止读线程读共享变量。

读写锁与互斥锁的一个重要区别就是读写锁允许多个线程同时读共享变量,而互斥锁是不允许的,这是读写锁在读多写少场景下性能优于互斥锁的关键。但读写锁的写操作是互斥的,当一个线程在写共享变量的时候,是不允许其他线程执行写操作和读操作。

使用示例

实现缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
**
* 基于ReentrantReadWriteLock实现缓存
*/
class Cache {
private static Map<String, Object> map = new HashMap<>();
private static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private static Lock r = rwl.readLock();
private static Lock w = rwl.writeLock();

public static Object get(String key) {
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}

public static Object put(String key, Object value) {
w.lock();
try {
return map.put(key, value);
} finally {
w.unlock();
}
}

public static void clear() {
w.lock();
try {
map.clear();
} finally {
w.unlock();
}
}
}

UML

ReentrantReadWriteLock UML

实现原理

ReentrantReadWriteLock内部维护了一对锁:一个读锁(readerLock)和一个写锁(writerLock)。这两个锁都是基于AQS实现的:它们内部聚合了同一个sync。那么sync如如何同时表示读锁和写锁的呢?ReentrantReadWriteLock通过“按位切割使用”巧妙地将同步状态变量state切分成了读锁和写锁两个部位,即高16位表示写,低16位表示读。

ReentrantReadWriteLock支持公平锁与非公平锁的获取,默认实现为非公平锁,如果要实现公平锁,往构造函数中传参:trues即可。

有一点需要注意,那就是只有写锁支持条件变量,读锁是不支持条件变量的,读锁调用newCondition()会抛出UnsupportedOperationException异常。

锁升级/锁降级

锁升级(不支持)

同一个线程中,在没有释放读锁的情况下,就去申请写锁,这属于锁升级。下面的代码演示了锁升级:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 读缓存
r.lock();
try {
v = m.get(key);
if (v == null) {
w.lock();
try {
// 再次验证并更新缓存
// 省略详细代码
} finally{
w.unlock();
}
}
} finally{
r.unlock();
}

代码看上去好像没有问题,先是获取读锁,然后再升级为写锁,但是ReadWriteLock不支持这种锁升级,因为当读锁还没有释放时,此时获取写锁,会导致写锁永久等待,最终导致相关线程都被阻塞,永远也没有机会被唤醒。

锁降级(支持)

同一个线程中,在没有释放写锁的情况下,就去申请读锁,这属于锁降级。下面的代码演示了锁降级:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class CachedData {
Object data;
volatile boolean cacheValid;

final ReadWriteLock rwl = new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
// 写锁
final Lock w = rwl.writeLock();

void processCachedData() {
// 获取读锁
r.lock();
if (!cacheValid) {
// 释放读锁,因为不允许读锁的升级
r.unlock();
// 获取写锁
w.lock();
try {
// 再次检查状态
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 释放写锁前,降级为读锁
// 降级是可以的
r.lock();
} finally {
// 释放写锁
w.unlock();
}
}

// 此处仍然持有读锁
try {
use(data);
}
finally {
r.unlock();
}
}
}

问题:锁降级中读锁的获取是否有必要?
答:要必要,主要是为了保证数据的可见性,如果当前线程不获取读锁而直接释放写锁,假设此刻另一个线程(T)获取了写锁并修改了数据,那么当前线程是无法感知线程T的数据更新。如果当前线程获取读锁,即遵循锁降级的步骤,则线程T将会被阻塞,知道当前线程使用数据并释放读锁之后,线程T才能获取写锁进行数据更新。

ReentrantLock实现了Lock接口,并提供了与synchronized相同的互斥性和内存可见性。与synchronized一样,ReentrantLock还提供了可重入的加锁语义,且同时支持公平锁和非公平锁的获取。ReentrantLock支持在Lock接口中定义的所有锁获取方式,并且与synchronized相比,它还为处理锁的不可用性问题提供了更高的灵活性。

使用示例

实现阻塞队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/**
* 基于ReentrantLock和Condition实现有界阻塞队列
*
* @param <T>
*/
class BoundedQueue<T> {
/**
* 数组元数
*/
private Object[] items;
/**
* 添加的下标
*/
private int addIndex;
/**
* 删除的下标
*/
private int removeIndex;
/**
* 数组当前数量
*/
private int count;

private Lock lock = new ReentrantLock();

/**
* 数组为空且要移除元素时,await;添加元素时,signal
*/
private Condition notEmpty = lock.newCondition();

/**
* 数组已满且要添加元素时,await;移除元素时,signal
*/
private Condition notFull = lock.newCondition();

public BoundedQueue(int size) {
items = new Object[size];
}

public void add(T t) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
notFull.await();
}
items[addIndex] = t;
if (++addIndex == items.length) {
addIndex = 0;
}
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}

public T remove() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await();
}
Object x = items[removeIndex];
if (++removeIndex == items.length) {
removeIndex = 0;
}
--count;
notFull.signal();
//noinspection unchecked
return (T) x;
} finally {
lock.unlock();
}
}
}

UML

ReentrantLock UML

实现原理

ReentrantLock是通过内部聚合AbstractQueuedSynchronizer的子类来Sync实现并发控制的。在其内部定义了基类Sync及其两个子类NonfairSyncFairSync,以支持公平锁与非公平锁的获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*
* 队列同步器基类,有公平和非公平两个版本
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;

/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();

/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*
* 公平和非公平子类中,tryLock都会执行非公平获取锁
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 等于0时,表示还没线程获取到该锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果是已经获取到锁的线程,自增并重置state
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 其它线程不可再获取锁
return false;
}

protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}

final ConditionObject newCondition() {
return new ConditionObject();
}

// Methods relayed from outer class

final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}

final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}

final boolean isLocked() {
return getState() != 0;
}

/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}

/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}

/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// c=0意味着“锁没有被任何线程锁拥有”,
if (c == 0) {
// 若“锁没有被任何线程锁拥有”
// 则判断“当前线程”是不是CLH队列中的第一个线程线程
// 若是的话,则获取该锁,设置锁的状态,并切设置锁的拥有者为“当前线程”
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

Java SDK并发包通过LockCondition两个接口来实现管程,其中Lock用于解决互斥问题,Condition用于解决同步问题。

Lock

相较于内置加锁机制synchronized,显示锁Lock除了支持类似synchronized隐式加锁的方法外,还支持超时、非阻塞、可中断的方式获取锁,这三种方式为我们编写更加安全、健壮的并发程序提供了很大的便利。

UML

Lock

Condition

UML

Condition

使用示例

RPC请求(Dubbo源码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 创建锁与条件变量
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();

// 调用方通过该方法等待结果
Object get(int timeout){
long start = System.nanoTime();
lock.lock();
try {
while (!isDone()) {
done.await(timeout);
long cur=System.nanoTime();
if (isDone() || cur-start > timeout){
break;
}
}
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException();
}
return returnFromResponse();
}
// RPC 结果是否已经返回
boolean isDone() {
return response != null;
}

// RPC 结果返回时调用该方法
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
}