人的知识就好比一个圆圈,圆圈里面是已知的,圆圈外面是未知的。你知道得越多,圆圈也就越大,你不知道的也就越多。

0%

概念

所谓 Guarded Suspension,直译过来就是“保护性地暂停”,如果执行现在的处理会造成问题,就让执行处理的线程进行等待,这种模式通过让线程等待来保证实例的安全性。

比如,项目组团建要外出聚餐,我们提前预订了一个包间,然后兴冲冲地奔过去,到那儿后大堂经理看了一眼包间,发现服务员正在收拾,就会告诉我们:“您预订的包间服务员正在收拾,请您稍等片刻。”过了一会,大堂经理发现包间已经收拾完了,于是马上带我们去包间就餐。

结构图

下图就是 Guarded Suspension 模式的结构图,非常简单,一个对象 GuardedObject ,内部有一个成员变量——受保护的对象,以及两个成员方法 —— get(Predicate p) 和 onChanged(T obj) 方法。

![Guarded Suspension模式结构图](/images/java/Guarded Suspension模式结构图.png)

其中,对象 GuardedObject 就是我们前面提到的大堂经理,受保护对象就是餐厅里面的包间;受保护对象的 get() 方法对应的是我们的就餐,就餐的前提条件是包间已经收拾好了,参数 p 就是用来描述这个前提条件的;受保护对象的 onChanged() 方法对应的是服务员把包间收拾好了,通过 onChanged() 方法可以 fire 一个事件,而这个事件往往能改变前提条件 p 的计算结果。下图中,左侧的绿色线程就是需要就餐的顾客,而右侧的蓝色线程就是收拾包间的服务员。

模板代码

GuardedObject 的内部实现非常简单,是管程的一个经典用法,核心是:get() 方法通过条件变量的 await() 方法实现等待,onChanged() 方法通过条件变量的 signalAll() 方法实现唤醒功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class GuardedObject<T> {
// 受保护的对象
T obj;
final Lock lock = new ReentrantLock();
final Condition done = lock.newCondition();
final int timeout = 1;

// 获取受保护对象
T get(Predicate<T> p) {
lock.lock();
try {
// MESA管程推荐写法
while(!p.test(obj)) {
done.await(timeout,
TimeUnit.SECONDS);
}
}
catch(InterruptedException e) {
throw new RuntimeException(e);
}finally{
lock.unlock();
}
// 返回⾮空的受保护对象
return obj;
}

// 事件通知⽅法
void onChanged(T obj) {
lock.lock();
try {
this.obj = obj;
done.signalAll();
} finally {
lock.unlock();
}
}
}

扩展

Guarded Suspension 模式本质上是一种等待唤醒机制的实现,只不过 Guarded Suspension 模式将其规范化了。规范化的好处是你无需重头思考如何实现,也无需担心实现程序的可理解性问题,同时也能避免一不小心写出个 Bug 来。但 Guarded Suspension 模式在解决实际问题的时候,往往还是需要扩展的。下面以Web请求异步响应为例演示如何扩展。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class GuardedObject<T> {
//受保护的对象
T obj;
final Lock lock = new ReentrantLock();
final Condition done = lock.newCondition();
final int timeout = 2;

// 保存所有GuardedObject
final static Map<Object, GuardedObject> gos = new ConcurrentHashMap<>();

// 静态⽅法创建GuardedObject
static <K> GuardedObject create(K key) {
GuardedObject go=new GuardedObject();
gos.put(key, go);
return go;
}

static <K, T> void fireEvent(K key, T obj) {
GuardedObject go=gos.remove(key);
if (go != null){
go.onChanged(obj);
}
}

// 获取受保护对象
T get(Predicate<T> p) {
lock.lock();
try {
// MESA管程推荐写法
while(!p.test(obj)) {
done.await(timeout, TimeUnit.SECONDS);
}
} catch(InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
//返回⾮空的受保护对象
return obj;
}

// 事件通知⽅法
void onChanged(T obj) {
lock.lock();
try {
this.obj = obj;
done.signalAll();
} finally {
lock.unlock();
}
}
}

// 处理浏览器发来的请求
Respond handleWebReq() {
int id = 序号⽣成器.get();
// 创建⼀消息
Message msg1 = new Message(id, "{...}");
// 创建GuardedObject实例
GuardedObject<Message> go = GuardedObject.create(id);

// 发送消息
send(msg1);

// 等待MQ消息
Message r = go.get(t->t != null);
}

void onMessage(Message msg) {
// 唤醒等待的线程
GuardedObject.fireEvent(msg.id, msg);
}

概念

如果现在不适合执行这个操作,或者没必要执行这个操作,就停止处理,直接返回。
Balking 模式与 Guarded Suspension 模式一样,也存在守护条件。在 Balking 模式中,如果守护条件不成立,则立即中断处理。这与 Guarded Suspension 模式有所不同,因为 Guarded Suspension 模式是一直等待至可以运行。

应用示例

文档编辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
boolean changed = false;

// ⾃动存盘操作
void autoSave() {
synchronized(this) {
if (!changed) {
return;
}
changed = false;
}

// 执⾏存盘操作
this.execSave();
}

// 编辑操作
void edit() {
// 省略编辑逻辑
......
change();
}

// 改变状态
void change() {
synchronized(this) {
changed = true;
}
}

总结

Balking 模式和 Guarded Suspension 模式从实现上看似乎没有多大的关系,Balking 模式只需要用互斥锁就能解决,而 Guarded Suspension 模式则要用到管程这种高级的并发原语;但是从应用的角度来看,它们解决的都是“线程安全的if”语义,不同之处在于,Guarded Suspension 模式会等待 if 条件为真,而 Balking 模式不会等待。
Balking 模式的经典实现是使用互斥锁,我们可以使用 Java 语言的内置 synchronized,也可以使用 SDK 提供 Lock;如果我们对互斥锁的性能不满意,可以尝试采用 volatile 方案,不过使用 volatile 方案需要我们更加谨慎。当然我们也可以尝试使用双重检查方案来优化性能,双重检查中的第一次检查,完全是出于对性能的考量:避免执行加锁操作,因为加锁操作很耗时。而加锁之后的二次检查,则是出于对安全性负责。

概念

所谓 Copy-on-Write,经常被缩写为 COW 或者 CoW,顾名思义就是写时复制。不可变对象的写操作往往都是使用 Copy-on-Write 方法解决的,当然Copy-on-Write的应用领域并不局限于 Immutability 模式。

应用实例

Java集合

Java 中的 CopyOnWriteArrayList 和 CopyOnWriteArraySet 这两个 Copy-on-Write 容器,它们背后的设计思想就是 Copy-on-Write。

操作系统

类 Unix 的操作系统中创建进程的 API 是 fork(),传统的 fork() 函数会创建父进程的一个完整副本,例如父进程的地址空间现在用到了 1G 的内存,那么 fork() 子进程的时候要复制父进程整个进程的地址空间(占有 1G 内存)给子进程,这个过程是很耗时的。而 Linux 中的 fork() 函数就聪明得多了,fork() 子进程的时候,并不复制整个进程的地址空间,而是让父子进程共享同一个地址空间;只用在父进程或者子进程需要写入的时候才会复制地址空间,从而使父子进程拥有各自的地址空间。本质上来讲,父子进程的地址空间以及数据都是要隔离的,使用 Copy-on-Write 更多地体现的是一种延时策略,只有在真正需要复制的时候才复制,而不是提前复制好,同时 Copy-on-Write 还支持按需复制,所以 Copy-on-Write 在操作系统领域是能够提升性能的。相比较而言,Java 提供的 Copy-on-Write 容器,由于在修改的同时会复制整个容器,所以在提升读操作性能的同时,是以内存复制为代价的。

Docker容器镜像

分布式源码管理系统Git

函数式编程

Copy-on-Write 最大的应用领域还是在函数式编程领域。函数式编程的基础是不可变性 (Immutability),所以函数式编程里面所有的修改操作都需要 Copy-on-Write 来解决。

路由动态更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// 路由信息
public final class Router{
private final String ip;
private final Integer port;
private final String iface;

// 构造函数
public Router(String ip, Integer port, String iface) {
this.ip = ip;
this.port = port;
this.iface = iface;
}

// 重写equals⽅法
public boolean equals(Object obj) {
if (obj instanceof Router) {
Router r = (Router)obj;
return iface.equals(r.iface) && ip.equals(r.ip) && port.equals(r.port);
}
return false;
}

public int hashCode() {
//省略hashCode相关代码
}
}

// 路由表信息
public class RouterTable {
// Key:接⼝名
// Value:路由集合
ConcurrentHashMap<String, CopyOnWriteArraySet<Router>> rt = new ConcurrentHashMap<>();

// 根据接⼝名获取路由表
public Set<Router> get(String iface) {
return rt.get(iface);
}

// 删除路由
public void remove(Router router) {
Set<Router> set=rt.get(router.iface);
if (set != null) {
set.remove(router);
}
}

// 增加路由
public void add(Router router) {
Set<Router> set = rt.computeIfAbsent(route.iface, r -> new CopyOnWriteArraySet<>());
set.add(router);
}
}

概念

解决并发问题,其实最简单的办法就是让共享变量只有读操作,而没有写操作。这个办法如此重要,以至于被上升到了一种解决并发问题的设计模式:不变性(Immutability)模式。所谓不变性,简单来讲,就是对象一旦被创建之后,状态就不再发生变化。换句话说,就是变量一旦被赋值,就不允许修改了(没有写操作);没有修改操作,也就是保持了不变性。

实现

将一个类所有的属性都设置成 final 的,并且只允许存在只读方法,那么这个类基本上就具备不可变性了。更严格的做法是这个类本身也是 final 的,也就是不允许继承。因为子类可以覆盖父类的方法,有可能改变不可变性,所以推荐大家在实际工作中,使用这种更严格的做法。

应用示例

JDK包装类

Java SDK 里很多类都具备不可变性,例如经常用到的 String 和 Long、Integer、Double 等基础类型的包装类都具备不可变性,这些对象的线程安全性都是靠不可变性来保证的。仔细翻看这些类的声明、属性和方法,我们会发现它们都严格遵守不可变类的三点要求:类和属性都是final的,所有方法均是只读的。以 String 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public final class String implements java.io.Serializable, Comparable<String>, CharSequence {
/** The value is used for character storage. */
private final char value[];

...

// 字符替换
public String replace(char oldChar, char newChar) {
if (oldChar != newChar) {
int len = value.length;
int i = -1;
char[] val = value; /* avoid getfield opcode */

// 定位到需要替换的字符位置
while (++i < len) {
if (val[i] == oldChar) {
break;
}
}
if (i < len) {
// 创建⼀个buf[],这是关键,⽤来保存替换后的字符串
char buf[] = new char[len];
for (int j = 0; j < i; j++) {
buf[j] = val[j];
}
while (i < len) {
char c = val[i];
buf[i] = (c == oldChar) ? newChar : c;
i++;
}
// 创建⼀个新的字符串返回,原字符串不会发⽣任何变化
return new String(buf, true);
}
}
// ⽆需替换,直接返回this
return this;
}

...
}

通过分析 String 的实现,不难发现,如果具备不可变性的类,需要提供类似修改的功能,具体该怎么操作呢?做法很简单,那就是创建一个新的不可变对象,这是与可变对象的一个重要区别,可变对象往往是修改自己的属性。

享元模式的运用

如果所有的修改操作都创建一个新的不可变对象,我们可能会担心:是不是创建的对象太多了,有点太浪费内存呢?是的,这样做的确有些浪费,因此我们可以利用享元模式来减少创建对象的数量,从而减少内存占用。

享元模式本质上其实就是一个对象池,利用享元模式创建对象的逻辑也很简单:创建之前,首先去对象池里看看是不是存在;如果已经存在,就利用对象池里的对象;如果不存在,就会新创建一个对象,并且把这个新创建出来的对象放进对象池里。

下面我们就以 Long 这个类作为例子,看看它是如何利用享元模式来优化对象的创建的。Long 这个类并没有照搬享元模式,Long 内部维护了一个静态的对象池,仅缓存了[-128,127]之间的数字,这个对象池在 JVM 启动的时候就创建好了,而且这个对象池一直都不会变化,也就是说它是静态的。之所以采用这样的设计,是因为 Long 这个对象的状态共有 2 种,实在太多,不宜全部缓存,而[-128,127]之间的数字利用率最高。下面的示例代码出自 Java 1.8,valueOf() 方法就用到了LongCache 这个缓存:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static Long valueOf(long l) {
final int offset = 128;
// [-128,127]直接的数字做了缓存
if (l >= -128 && l <= 127) { // will cache
return LongCache.cache[(int)l + offset];
}
return new Long(l);
}

// 缓存,等价于对象池仅缓存[-128,127]直接的数字
private static class LongCache {
private LongCache(){}

static final Long cache[] = new Long[-(-128) + 127 + 1];

static {
for(int i = 0; i < cache.length; i++)
cache[i] = new Long(i - 128);
}
}

需要注意的是,正是因为包装类内部用到了享元模式,这会导致看上去私有的锁,其实是共有的,因此基本上所有的基础类型的包装类都不适合做锁,因为它们例如在下面代码中,本意是 A 用锁 al,B 用锁 bl,各自管理各自的,互不影响。但实际上 al 和 bl 是一个对象,结果 A 和 B 共用的是一把锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class A {
Long al=Long.valueOf(1);
public void setAX(){
synchronized (al) {
//省略代码⽆数
}
}
}
class B {
Long bl=Long.valueOf(1);
public void setBY(){
synchronized (bl) {
//省略代码⽆数
}
}
}

注意事项

在使用 Immutability 模式的时候,需要注意以下两点:

  1. 对象的所有属性都是 final 的,并不能保证不可变性;
  2. 不可变对象也需要正确发布。

下面我们再看看如何正确地发布不可变对象。不可变对象虽然是线程安全的,但是并不意味着引用这些不可变对象的对象就是线程安全的。例如在下面的代码中,Foo 具备不可变性,线程安全,但是类 Bar 并不是线程安全的,类 Bar 中持有对 Foo 的引用 foo,对 foo 这个引用的修改在多线程中并不能保证可见性和原子性。

1
2
3
4
5
6
7
8
9
10
11
12
// Foo线程安全
final class Foo {
final int age=0;
final int name="abc";
}
// Bar线程不安全
class Bar {
Foo foo;
void setFoo(Foo f) {
this.foo=f;
}
}

如果我们的程序仅仅需要 foo 保持可见性,无需保证原子性,那么可以将 foo 声明为 volatile 变量,这样就能保证可见性。如果我们的程序需要保证原子性,那么可以通过原子类来实现。下面的示例代码是合理库存的原子化实现,其中就是用原子类解决了不可变对象引用的原子性问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class SafeWM {
class WMRange{
final int upper;
final int lower;
WMRange(int upper,int lower) {
// 省略构造函数实现
}
}

final AtomicReference<WMRange> rf = new AtomicReference<>(new WMRange(0,0));

// 设置库存上限
void setUpper(int v) {
while(true) {
WMRange or = rf.get();
// 检查参数合法性
if(v < or.lower) {
throw new IllegalArgumentException();
}
WMRange nr = new WMRange(v, or.lower);
if(rf.compareAndSet(or, nr)) {
return;
}
}
}
}

无状态

具备不变性的对象,只有一种状态,这个状态由对象内部所有的不变属性共同决定。其实还有一种更简单的不变性对象,那就是无状态。无状态对象内部没有属性,只有方法。除了无状态的对象,我们可能还听说过无状态的服务、无状态的协议等等。无状态有很多好处,最核心的一点就是性能。在多线程领域,无状态对象没有线程安全问题,无需同步处理,自然性能很好;在分布式领域,无状态意味着可以无限地水平扩展,所以分布式领域里面性能的瓶颈一定不是出在无状态的服务节点上。

概念

顾名思义,两阶段终止模式,就是将终止过程分成两个阶段,其中第一个阶段主要是线程 T1 向线程 T2 发送终止指令,而第二阶段则是线程 T2 响应终止指令。

两阶段终止模式示意图

要透彻理解 Java 语言里的终止指令,需要从 Java 线程的状态转换过程说起,参考下图:
线程状态转换示意图

由上图可知,Java 线程进入终止状态的前提是线程进入 RUNNABLE 状态,而实际上线程也可能处在休眠状态,也就是说,我们要想终止一个线程,首先要把线程的状态从休眠状态转换到 RUNNABLE 状态。如何做到呢?这个要靠 Java Thread 类提供的 interrupt() 方法,它可以将休眠状态的线程转换到 RUNNABLE 状态。
线程转换到 RUNNABLE 状态之后,我们如何再将其终止呢?RUNNABLE 状态转换到终止状态,优雅的方式是让 Java 线程自己执行完 run() 方法,所以一般我们采用的方法是设置一个标志位,然后线程会在合适的时机检查这个标志位,如果发现符合终止条件,则自动退出 run() 方法。这个过程其实就是我们前面提到的第二阶段:响应终止指令。综合上面这两点,我们能总结出终止指令,其实包括两方面内容:interrupt() 方法和线程终止的标志位。

应用示例

终止采集功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class Proxy {
boolean started = false;
// 采集线程
Thread rptThread;

// 启动采集功能
synchronized void start() {
// 不允许同时启动多个采集线程
if (started) {
return;
}
started = true;

rptThread = new Thread(() -> {
// 判断标志位
while (!Thread.currentThread().isInterrupted()) {
// 省略采集、回传实现
report();
// 每隔两秒钟采集、回传⼀次数据
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// 重新设置线程中断状态
Thread.currentThread().interrupt();
}
}

// 执⾏到此处说明线程⻢上终⽌
started = false;
});

rptThread.start();
}

// 终⽌采集功能
synchronized void stop(){
// 将线程rptThread状态转换到RUNNABLE
rptThread.interrupt();
}
}

需要注意的是,我们在捕获 Thread.sleep() 的中断异常之后,通过 Thread.currentThread().interrupt() 重新设置了线程的中断状态,因为 JVM 的异常处理会清除线程的中断状态。

虽然上述实现的确能够解决当前的问题,但是建议大家在实际工作中谨慎使用。原因在于我们很可能在线程的 run() 方法中调用第三方类库提供的方法,而我们没有办法保证第三方类库正确处理了线程的中断异常,例如第三方类库在捕获到 Thread.sleep() 方法抛出的中断异常后,没有重新设置线程的中断状态,那么就会导致线程不能够正常终止。所以强烈建议大家设置自己的线程终止标志位,例如在下面的代码中,使用 isTerminated 作为线程终止标志位,此时无论是否正确处理了线程的中断异常,都不会影响线程优雅地终止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class Proxy {
// 线程终⽌标志位
volatile boolean terminated = false;

boolean started = false;
// 采集线程
Thread rptThread;

// 启动采集功能
synchronized void start() {
// 不允许同时启动多个采集线程
if (started) {
return;
}
started = true;
terminated = false;

rptThread = new Thread(() -> {
// 判断标志位
while (!terminated) {
// 省略采集、回传实现
report();
// 每隔两秒钟采集、回传⼀次数据
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// 重新设置线程中断状态
Thread.currentThread().interrupt();
}
}

// 执⾏到此处说明线程⻢上终⽌
started = false;
});

rptThread.start();
}

// 终⽌采集功能
synchronized void stop(){
// 设置中断标志位
terminated = true;

// 中断线程rptThread
rptThread.interrupt();
}
}

如何优雅地终止线程池

线程池提供了两个方法:shutdown() 和 shutdownNow()。
shutdown() 方法是一种很保守的关闭线程池的方法。线程池执行 shutdown() 后,就会拒绝接收新的任务,但是会等待线程池中正在执行的任务和已经进入阻塞队列的任务都执行完之后才最终关闭线程池。
而 shutdownNow() 方法,相对就激进一些了,线程池执行 shutdownNow() 后,会拒绝接收新的任务,同时还会中断线程池中正在执行的任务,已经进入阻塞队列的任务也被剥夺了执行的机会,不过这些被剥夺执行机会的任务会作为 shutdownNow() 方法的返回值返回。因为 shutdownNow() 方法会中断正在执行的线程,所以提交到线程池的任务,如果需要优雅地结束,就需要正确地处理线程中断。
如果提交到线程池的任务不允许取消,那就不能使用 shutdownNow() 方法终止线程池。不过,如果提交到线程池的任务允许后续以补偿的方式重新执行,也是可以使用 shutdownNow() 方法终止线程池的。

分析完 shutdown() 和 shutdownNow() 方法之后,不难发现它们实质上使用的也是两阶段终止模式,只是终止指令的范围不同而已,前者只影响阻塞队列接收任务,后者范围扩大到线程池中所有的任务。

概念

过一个容器来解决生产者和消费者的强耦合关系,生产者生成数据无需等待消费者索取,消费者无需直接索要数据。两者并不进行任何通讯,而是通过容器来进行操作作用:解耦、支持并发、支持忙闲不均。

生产者-消费者模式的核心是一个任务队列,生产者线程生产任务,并将任务添加到任务队列中,而消费者线程从任务队列中获取任务并执行。下面是生产者-消费者模式的一个示意图:

生产者-消费者模式示意图

优点

  • 解耦
  • 支持异步
  • 平衡生产者和消费者的速度差异

实现方式

公共类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/**
* 资源接口
*
* @author cdrcool
*/
public interface Resource<T> {

/**
* 添加资源
*/
void add(T t) throws InterruptedException;

/**
* 获取资源
*/
T remove() throws InterruptedException;
}

/**
* 生产者线程
*
* @author cdrcool
*/
public class ProducerThread extends Thread {
private Resource<Integer> resource;

public ProducerThread(Resource<Integer> resource) {
this.resource = resource;
}

@Override
public void run() {
while (true) {
try {
// 模拟耗时
Thread.sleep(1000);

resource.add(new Random().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

/**
* 消费者线程
*
* @author cdrcool
*/
public class ConsumerThread extends Thread {
private Resource<Integer> resource;

public ConsumerThread(Resource<Integer> resource) {
this.resource = resource;
}

@Override
public void run() {
while (true) {
try {
resource.remove();

// 模拟耗时
Thread.sleep(3 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

/**
* 上下文
*
* @author cdrcool
*/
public class Context {

@Test
public void test() {
Resource<Integer> resource = new Resource1<>(10);

// 生产者
ProducerThread producer = new ProducerThread(resource);

// 多个消费者
ConsumerThread consumer1 = new ConsumerThread(resource);
ConsumerThread consumer2 = new ConsumerThread(resource);
ConsumerThread consumer3 = new ConsumerThread(resource);

producer.start();
consumer1.start();
consumer2.start();
consumer3.start();

try {
producer.join();
consumer1.join();
consumer2.join();
consumer3.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

synchronized及wait和notify

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* 基于synchronized及wait和notify
*
* @author cdrcool
*/
public class Resource1<T> implements Resource<T> {
/**
* 数组元数
*/
private Object[] items;

/**
* 添加的下标
*/
private int addIndex;

/**
* 删除的下标
*/
private int removeIndex;

/**
* 数组当前数量
*/
private int count;

public Resource1(int size) {
items = new Object[size];
}

@Override
public synchronized void add(T t) throws InterruptedException {
while (count == items.length) {
wait();
}
items[addIndex] = t;
if (++addIndex == items.length) {
addIndex = 0;
}
++count;
notifyAll();

System.out.println(String.format("生产者%s生产一个资源:%s", Thread.currentThread().getName(), t));
}

@Override
public synchronized T remove() throws InterruptedException {
while (count == 0) {
wait();
}
Object x = items[removeIndex];
if (++removeIndex == items.length) {
removeIndex = 0;
}
--count;
notifyAll();

System.out.println(String.format("生产者%s消费一个资源:%s", Thread.currentThread().getName(), x));

//noinspection unchecked
return (T) x;
}
}

Lock和Condition的await和signalAll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/**
* 基于Lock和Condition的await和signalAll
*
* @author cdrcool
*/
public class Resource2<T> implements Resource<T> {
/**
* 数组元数
*/
private Object[] items;

/**
* 添加的下标
*/
private int addIndex;

/**
* 删除的下标
*/
private int removeIndex;

/**
* 数组当前数量
*/
private int count;

/**
* 可重入锁
*/
private Lock lock = new ReentrantLock();

/**
* 数组为空且要移除元素时,await;添加元素时,signal
*/
private Condition notEmpty = lock.newCondition();

/**
* 数组已满且要添加元素时,await;移除元素时,signal
*/
private Condition notFull = lock.newCondition();

public Resource2(int size) {
items = new Object[size];
}

@Override
public void add(T t) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
notFull.await();
}
items[addIndex] = t;
if (++addIndex == items.length) {
addIndex = 0;
}
++count;
notEmpty.signal();

System.out.println(String.format("生产者%s生产一个资源:%s", Thread.currentThread().getName(), t));
} finally {
lock.unlock();
}
}

@Override
public T remove() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await();
}
Object x = items[removeIndex];
if (++removeIndex == items.length) {
removeIndex = 0;
}
--count;
notFull.signal();

System.out.println(String.format("生产者%s消费一个资源:%s", Thread.currentThread().getName(), x));

//noinspection unchecked
return (T) x;
} finally {
lock.unlock();
}
}
}

BlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 基于BlockingQueue
*
* @author cdrcool
*/
public class Resource3<T> implements Resource<T> {
private BlockingQueue<T> items;

public Resource3(int size) {
items = new LinkedBlockingQueue<>(size);
}

@Override
public void add(T value) throws InterruptedException {
items.put(value);
System.out.println(String.format("生产者%s生产一个资源:%s,当前资源池有%s个资源",
Thread.currentThread().getName(), value, items.size()));
}

@Override
public T remove() throws InterruptedException {
T value = items.take();

System.out.println(String.format("消费者%s消耗一个资源:%s,当前资源池有%s个资源",
Thread.currentThread().getName(), value, items.size()));

return value;
}
}

应用示例

执行批量任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 任务队列
BlockingQueue<Task> bq = new LinkedBlockingQueue<>(2000);

// 启动5个消费者线程执⾏批量任务
void start() {
ExecutorService es = Executors.newFixedThreadPool(5);
for (int i=0; i<5; i++) {
es.execute(()->{
try {
while (true) {
// 获取批量任务
List<Task> ts = pollTasks();
// 执⾏批量任务
execTasks(ts);
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
}

// 从任务队列中获取批量任务
List<Task> pollTasks() throws InterruptedException {
List<Task> ts = new LinkedList<>();
// 阻塞式获取⼀条任务
Task t = bq.take();
while (t != null) {
ts.add(t);
// ⾮阻塞式获取⼀条任务
t = bq.poll();
}
return ts;
}

// 批量执⾏任务
execTasks(List<Task> ts) {
// 省略具体代码⽆数
}

需要注意的是,从任务队列中获取批量任务的方法 pollTasks() 中,首先是以阻塞方式获取任务队列中的一条任务,而后则是以非阻塞的方式获取任务;之所以首先采用阻塞方式,是因为如果任务队列中没有任务,这样的方式能够避免无谓的循环。

分阶段提交

利用生产者-消费者模式还可以轻松地支持一种分阶段提交的应用场景。我们知道写文件如果同步刷盘性能会很慢,所以对于不是很重要的数据,我们往往采用异步刷盘的方式。考虑实现以下需求:

  1. ERROR级别的日志需要立即刷盘;
  2. 数据积累到500条需要立即刷盘;
  3. 存在未刷盘数据,且5秒钟内未曾刷盘,需要立即刷盘。
    这个日志组件的异步刷盘操作本质上其实就是一种分阶段提交。下面是示例代码:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    class Logger {
    // 任务队列
    final BlockingQueue<LogMsg> bq = new BlockingQueue<>();
    // flush批量
    static final int batchSize = 500;
    // 只需要⼀个线程写⽇志
    ExecutorService es = Executors.newFixedThreadPool(1);

    //启动写⽇志线程
    void start(){
    File file = File.createTempFile("foo", ".log");
    final FileWriter writer = new FileWriter(file);
    this.es.execute(()->{
    try {
    // 未刷盘⽇志数量
    int curIdx = 0;
    long preFT = System.currentTimeMillis();
    while (true) {
    LogMsg log = bq.poll(5, TimeUnit.SECONDS);
    // 写⽇志
    if (log != null) {
    writer.write(log.toString());
    ++curIdx;
    }

    //如果不存在未刷盘数据,则⽆需刷盘
    if (curIdx <= 0) {
    continue;
    }

    // 根据规则刷盘
    if (log!=null && log.level==LEVEL.ERROR || curIdx == batchSize || System.currentTimeMillis() - preFT > 5000) {
    writer.flush();
    curIdx = 0;
    preFT = System.currentTimeMillis();
    }
    }
    } catch(Exception e) {
    e.printStackTrace();
    } finally {
    try {
    writer.flush();
    writer.close();
    } catch(IOException e) {
    e.printStackTrace();
    }
    }
    });
    }

    // 写INFO级别⽇志
    void info(String msg) {
    bq.put(new LogMsg(LEVEL.INFO, msg));
    }

    // 写ERROR级别⽇志
    void error(String msg) {
    bq.put(new LogMsg(LEVEL.ERROR, msg));
    }
    }

    // ⽇志级别
    enum LEVEL {
    INFO,
    ERROR
    }

    class LogMsg {
    LEVEL level;
    String msg;
    // 省略构造函数实现
    LogMsg(LEVEL lvl, String msg){}
    // 省略toString()实现
    String toString(){}
    }

总结

Java 语言提供的线程池本身就是一种生产者-消费者模式的实现,但是线程池中的线程每次只能从任务队列中消费一个任务来执行,对于大部分并发场景这种策略都没有问题。但是有些场景还是需要自己来实现,例如需要批量执行以及分阶段提交的场景。
生产者-消费者模式在分布式计算中的应用也非常广泛。在分布式场景下,我们可以借助分布式消息队列(MQ)来实现生产者-消费者模式。MQ一般都会支持两种消息模型,一种是点对点模型,一种是发布订阅模型。这两种模型的区别在于,点对点模型里一个消息只会被一个消费者消费,和 Java 的线程池非常类似(Java 线程池的任务也只会被一个线程执行);而发布订阅模型里一个消息会被多个消费者消费,本质上是一种消息的广播,在多线程编程领域,我们可以结合观察者模式实现广播功能。

在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture的方法。

使用示例

为了领略CompletableFuture异步编程的优势,这里我们用CompletableFuture来模拟烧水泡茶程序。在下面的程序中,我们分了3个任务:任务1负责洗水壶、烧开水,任务2负责洗茶壶、洗茶杯和拿茶叶,任务3负责泡茶。其中任务3要等待任务1和任务2都完成后才能开始。这个分工如下图所示:
烧水泡茶分工方案

下面是实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* 模拟烧开水泡茶
* 任务1:洗⽔壶 -> 烧开⽔
* 任务2:洗茶壶 -> 洗茶杯 -> 拿茶叶
* 任务3:任务1和任务2完成后执⾏:泡茶
*/
@Test
public void makeTea() {
// 任务1:洗⽔壶 -> 烧开⽔
CompletableFuture<Void> future1 =
CompletableFuture.runAsync(() -> {
System.out.println("T1:洗⽔壶...");
sleep(1, TimeUnit.SECONDS);
System.out.println("T1:烧开⽔...");
sleep(15, TimeUnit.SECONDS);
});

// 任务2:洗茶壶 -> 洗茶杯 -> 拿茶叶
CompletableFuture<String> future2 =
CompletableFuture.supplyAsync(() -> {
System.out.println("T2:洗茶壶...");
sleep(1, TimeUnit.SECONDS);
System.out.println("T2:洗茶杯...");
sleep(2, TimeUnit.SECONDS);
System.out.println("T2:拿茶叶...");
sleep(1, TimeUnit.SECONDS);
return "⻰井";
});

// 任务3:任务1和任务2完成后执⾏:泡茶
CompletableFuture<String> future3 =
future1.thenCombine(future2, (result1, result2) -> {
System.out.println("T1:拿到茶叶:" + result2);
System.out.println("T1:泡茶...");
return "上茶:" + result2;
});

// 等待任务3执⾏结果
System.out.println(future3.join());
}

private void sleep(int t, TimeUnit unit) {
try {
unit.sleep(t);
} catch (InterruptedException e) {
}
}

观察代码不难发现:

  • 无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注;
  • 语义更清晰,例如future3 = future1.thenCombine(future2, (result1, result2) -> {})能够清晰地表述“任务3要等待任务1和任务2都完成后才能开始”;
  • 代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。

UML

CompletableFuture UML

CompletableFuture同时实现了Future接口和CompletionStage接口。Future接口比较好理解,它代表一个异步计算的结果,并且提供了一些方法来让调用者检测异步过程是否完成,或者取得异步计算的结果,或者取消正在执行的异步任务。CompletionStage接口则有点复杂(里面定义了38个方法),在后面会单独介绍。

创建

创建CompletableFuture对象主要是通过调用以下4个静态方法:

1
2
3
4
5
6
7
static CompletableFuture<Void> runAsync(Runnable runnable);

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);

static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

runAsyncsupplyAsync之间的区别是:Runnable接口的run()方法没有返回值,而Supplier接口的get()方法是有返回值的;前两个方法和后两个方法的区别在于:后两个方法可以指定线程池参数。

默认情况下,CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是CPU的核数(可以通过JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism来设置ForkJoinPool线程池的线程数)。如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的I/O操作,就会导致线程池中所有线程都阻塞在I/O操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议大家要根据不同的业务类型创建不同的线程池,以避免互相干扰。

创建完CompletableFuture对象之后,会自动地异步执行runnable.run()方法或者supplier.get()方法,对于一个异步操作,我们需要关注两个问题:一个是异步操作什么时候结束,另一个是如何获取异步操作的执行结果。因为CompletableFuture类实现了Future接口,所以这两个问题我们为都可以通过Future接口来解决。另外,CompletableFuture类还实现了CompletionStage接口,这个接口内容实在是太丰富了,接口里的方法我们该如何理解呢?

CompletionStage

我们可以站在分工的角度类比一下工作流。任务是有时序关系的,比如有串行关系并行关系汇聚关系等。以前面烧水泡茶的例子来说明,其中洗水壶和烧开水就是串行关系,洗水壶、烧开水和洗茶壶、洗茶杯这两组任务之间就是并行关系,而烧开水、拿茶叶和泡茶就是汇聚关系。
烧水泡茶任务间关系

CompletionStage接口可以清晰地描述任务之间的这种时序关系,例如前面提到的future3 = future1.thenCombine(future2, (result1, result2) -> {})描述的就是一种汇聚关系。烧水泡茶程序中的汇聚关系是一种AND聚合关系,这里的AND指的是所有依赖的任务(烧开水和拿茶叶)都完成后才开始执行当前任务(泡茶)。既然有AND聚合关系,那就一定还有OR聚合关系,所谓OR指的是依赖的任务只要有一个完成就可以执行当前任务。
在编程领域,还有一个绕不过去的山头,那就是异常处理,CompletionStage接口也可以方便地描述异常处理。
下面我们就来一一介绍,CompletionStage接口如何描述串行关系、AND聚合关系、OR聚合关系以及异常处理。

串行关系

CompletionStage接口里面描述串行关系,主要是thenApplythenAcceptthenRunthenCompose系列的接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);

<U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);

<U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);

CompletionStage<Void> thenAccept(Consumer<? super T> action);

CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);

CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);

CompletionStage<Void> thenRun(Runnable action);

CompletionStage<Void> thenRunAsync(Runnable action);

CompletionStage<Void> thenRunAsync(Runnable action, Executor executor);

<U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);

<U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);

<U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor);

需要注意的是thenCompose系列方法,这个系列的方法会新创建出一个子流程,最终结果和thenApply系列是相同的。

AND汇聚关系

CompletionStage接口里面描述AND汇聚关系,主要是thenCombinethenAcceptBothrunAfterBoth系列的接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);

<U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);

<U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);

<U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);

<U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);

<U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor);

CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action);

CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action);

CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor);

OR汇聚关系

CompletionStage接口里面描述OR汇聚关系,主要是applyToEitheracceptEitherrunAfterEither系列的接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);

<U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn);

<U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor);

CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);

CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action);

CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor);

CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action);

CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action);

CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor);

异常处理

CompletionStage接口给我们提供的方案非常简单,比try{}catch{}还要简单,下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);

CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);

CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);

<U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);

<U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);

<U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor);

全局锁

全局锁就是对整个数据库实例枷锁,命令是:Flush tables with read lock(FTWRL)。当需要让整个库处于只读状态的时候,可以使用这个命令,之后其他线程的以下语句会被阻塞:数据更新语句(数据的增删改)、数据定义语句(包括建表、修改表结构等)和更新类事务的提交语句。

全局锁的典型使用场景是,做全库逻辑备份。

但是让整库都只读,听上去就很危险:

  • 如果你在主库上备份,那么在备份期间都不能执行更新,业务基本上就得停摆
  • 如果你在从库上备份,那么备份期间从库不能执行主库同步过来的 binlog,会导致主从延迟

因此可以使用可重复读隔离级别来获取一致性视图。不过对于 MyISAM 这种不支持事务的引擎,还是只能使用 FTWRL。

不建议使用set global readonly=true,确实 readonly 方式也可以让全库进入只读状态,但我还是会建议你用 FTWRL 方式,主要有两个原因:

  • 在有些系统中,readonly 的值会被用来做其他逻辑,比如用来判断一个库是主库还是备库。因此,修改 global 变量的方式影响面更大。
  • 在异常处理机制上有差异。如果执行 FTWRL 命令之后之后由于客户端发生异常断开,那么MySQL会自动释放这个全局锁,整个库回到可以正常更新的状态。而将整个库设置为 readonly 之后,如果客户端发生异常,则数据库就会一直保持 readonly 状态,这样会导致整个库长时间处于不可写状态,风险较高。

表级锁

MySQL 里面表级别的锁有两种:一种是表锁,一种是元数据锁(meta data lock,MDL)。

表锁的语法是 lock tables … read/write。与 FTWRL 类似,可以用 unlock tables 主动释放锁,也可以在客户端断开的时候自动释放。需要注意,lock tables 语法除了会限制别的线程的读写外,也限定了本线程接下来的操作对象。

另一类表级的锁是 MDL(metadata lock)。MDL 不需要显式使用,在访问一个表的时候会被自动加上。MDL 的作用是,保证读写的正确性。

当对一个表做增删改查操作的时候,加 MDL 读锁;当要对表做结构变更操作的时候,加 MDL 写锁。

  • 读锁之间不互斥,因此你可以有多个线程同时对一张表增删改查。
  • 读写锁之间、写锁之间是互斥的,用来保证变更表结构操作的安全性。因此,如果有两个线程要同时给一个表加字段,其中一个要等另一个执行完才能开始执行。

行锁

行锁就是针对数据表中行记录的锁。这很好理解,比如事务 A 更新了一行,而这时候事务 B 也要更新同一行,则必须等事务A的操作完成后才能进行更新。

如果事务中需要锁多个行,要把最可能造成锁冲突、最可能影响并发度的锁尽量往后放。

作用

索引其实就是为了提高数据查询的效率,就像书的目录一样。

常见模型

  • 哈希表
  • 有序数组
  • 二叉搜索树

特性:

  • 哈希表这种结构适用于只有等值查询的场景
  • 有序数组虽然在等值查询和范围查询场景中的性能就都非常优秀,但有序数组索引只适用于静态存储引擎
  • 由于索引不止存在内存中,还要写到磁盘上,因此若使用二叉搜索树,会因为频繁访问磁盘导致查询效率低下

因此,MySQL 是使用 B+ 树来实现索引的

InnoDB 索引模型

假设,我们有一个主键列为 ID 的表,表中有字段 k,并且在 k 上有索引。这个表的建表语句是:

1
2
3
4
5
6
create table T(
id int primary key,
k int not null,
name varchar(16),
index (k)
) engine=InnoDB;

对应的索引组织结构如下:
InnoDB 索引组织结构示例

索引类型

普通索引

最基本的索引,它没有任何限制,用于加速查询。

唯一索引

索引列的值必须唯一,但允许有空值。如果是组合索引,则列值的组合必须唯一。
由于唯一索引用不上 change buffer 的优化机制,因此如果业务可以接受,从性能角度出发我建议你优先考虑非唯一索引。

主键索引

是一种特殊的唯一索引,一个表只能有一个主键,不允许有空值。一般是在建表的时候同时创建主键索引。

主键索引的叶子节点存的是整行数据。在 InnoDB 里,主键索引也被称为聚簇索引(clustered index)。
非主键索引的叶子节点内容是主键的值。在 InnoDB 里,非主键索引也被称为二级索引(secondary index)。

由于基于非主键索引的查询需要多扫描一棵索引树(回表),因此我们在应用中应该尽量使用主键查询。

组合索引

指多个字段上创建的索引,只有在查询条件中使用了创建索引时的第一个字段,索引才会被使用。使用组合索引时遵循最左前缀集合。

全文索引

主要用来查找文本中的关键字,而不是直接与索引中的值相比较。

性能影响

  • insert 索引个数越多,对于 insert 操作来说,维护的成本就越大,插入一条数据的速度也就越慢。

  • delete delete 操作刚好和 insert 相反,当删除一条数据时,会把这条数据涉及到的多个索引中的数据删除。开销要比 insert 小。

  • update 这个操作不同于 insert、delete,只有当 update 的这个字段涉及到索引时,才需要维护索引,相对来说开销要小一些。

索引维护

页分裂

B+ 树为了维护索引有序性,在插入新值的时候需要做必要的维护。如果插入的数据所在的数据页已经满了,根据 B+ 树的算法,这时候需要申请一个新的数据页,然后挪动部分数据过去。这个过程称为页分裂。

页合并

相邻两个页由于删除了数据,利用率很低之后,会将数据页做合并。合并的过程,可以认为是分裂过程的逆过程。

主键的选择

如果使用自增主键,那么每次插入一条新记录,都是追加操作,都不涉及到挪动其他记录,也不会触发叶子节点的分裂。而有业务逻辑的字段做主键,则往往不容易保证有序插入,这样写数据成本相对较高。
另外,主键长度越小,普通索引的叶子节点就越小,普通索引占用的空间也就越小。
所以,从性能和存储空间方面考量,自增主键往往是更合理的选择。

业务主键适用于以下场景:

  • 只有一个索引
  • 该索引必须是唯一索引

覆盖索引

  • 解释一:就是 select 的数据列只用从索引中就能够取得,不必从数据表中读取,换句话说查询列要被所使用的索引覆盖。
  • 解释二:索引是高效找到行的一个方法,当能通过检索索引就可以读取想要的数据,那就不需要再到数据表中读取行了。如果一个索引包含了(或覆盖了)满足查询语句中字段与条件的数据就叫做覆盖索引。
  • 解释三:是非聚集组合索引的一种形式,它包括在查询里的 Select、Join 和 Where 子句用到的所有列(即建立索引的字段正好是覆盖查询语句 select 子句 与查询条件 Where 子句中所涉及的字段,也即,索引包含了查询正在查找的所有数据)。

最左前缀原则

顾名思义是最左优先,以最左边的为起点任何连续的索引都能匹配上。
注:如果第一个字段是范围查询需要单独建一个索引。
注:在创建多列索引时,要根据业务需求,where 子句中使用最频繁的一列放在最左边。
当创建 (a,b,c) 复合索引时,想要索引生效的话,只能使用 a 和 a,b 和 a,b,c 三种组合。

索引下推

索引条件下推(ICP)是对 MySQL 使用索引从表中检索行的情况的优化。如果没有 ICP,存储引擎会遍历索引以查找基表中的行,并将它们返回给 MySQL 服务器,该服务器会评估 WHERE 行的条件。启用 ICP 后,如果 WHERE 只使用索引中的列来评估部分条件,MySQL 服务器会推送这部分内容。WHERE 条件下到存储引擎。然后,存储引擎通过使用索引条目来评估推送的索引条件,并且仅当满足该条件时才从表中读取行。ICP 可以减少存储引擎必须访问基表的次数以及 MySQL 服务器必须访问存储引擎的次数。

重建索引

对于普通索引,可以通过执行下面两行来重建索引:

1
2
alter table T drop index k;
alter table T add index(k);

但是对于主键索引,如果也是执行下面两行来重建索引:

1
2
alter table T drop primary key;
alter table T add primary key(id);

则不够合理,这是因为不论是删除主键还是创建主键,都会将整个表重建。所以连着执行这两个语句的话,第一个语句就白做了。
可以改为执行alter table T engine=InnoDB

字符串添加索引

  • 直接创建完整索引,这样可能比较占用空间
  • 创建前缀索引,节省空间,但会增加查询扫描次数,并且不能使用覆盖索引
  • 倒序存储,再创建前缀索引,用于绕过字符串本身前缀的区分度不够的问题
  • 创建 hash 字段索引,查询性能稳定,有额外的存储和计算消耗,跟第三种方式一样,都不支持范围扫描

事务基本要素:ACID

  • 原子性(Atomicity)
    事务的原子性是指事务必须是一个原子的操作序列单元。事务中包含的各项操作在一次执行过程中,只允许出现以下两种状态之一:全部成功执行、全部不执行。任何一项操作失败都将导致整个事务失败,同时其他已经被执行的操作都将被撤销并回滚,只打所有的操作全部成功,整个事务才算是成功完成。

  • 一致性(Consistency)
    事务的一致性是指事务的执行不能破坏数据库数据的完整性和一致性,一个事务在执行之前和执行之后,数据库都必须处于一致性状态。也就是说,事务执行的结果必须是使数据库从一个一致性状态转变到另一个一致性状态,因此当数据库只包含成功事务提交的结果时,就能说数据库处于一致性状态。而如果数据库系统在运行过程中发生故障, 有些事务尚未完成就被迫中断,这些未完成的事务对数据库所做的修改有一部分已写入物理数据库,这时数据库就处于一种不正确的状态,或者说是不一致的状态。

  • 隔离性(Isolation)
    事务的隔离性是指在并发环境中,并发的事务是相互隔离的,一个事务的执行不能被其他事务干扰。也就是说,不同的事务并发操纵相同的数据时,每个事务都有各自完整的数据空间,即一个事务内部的操作及使用的数据对其他并发事务是隔离的,并发执行的 各个事务之间不能互相干扰。

  • 持久性(Durability)
    事务的持久性也被称为永久性,是指一个事务一旦提交,它对数据库中对应数据的状态变更就应该是永久性的。换句话说,一旦某个事务成功结束,那么它对数据库所做的更新就必须被永久保存下来——即使发生系统崩溃或机器宕机等故障,只要数据库能够重新启动,那么一定能够将其恢复到事务成功结束时的状态。

脏读、幻读、不可重复读

  • 脏读
    指一个事务 A 正在访问数据,并且对该数据进行了修改,但是这种修改还没有提交到数据库中(也可能因为某些原因 Rollback了)。这时候另外一个事务 B 也访问这个数据,然后使用了这个被 A 修改的数据,那么这个数据就是脏的,并不是数据库中真实的数据。这就被称作脏读。

解决办法:把数据库事务隔离级别调整到 READ_COMMITTED。

  • 不可重复读
    指在一个事务 A 内,多次读同一个数据,但是事务 A 没有结束时,另外一个事务 B 也访问该同一数据。那么在事务 A 的两次读数据之间,由于事务 B 的修改导致事务 A 两次读到的数据可能是不一样的。这就发生了在一个事务内两次读到的数据不一样,这就被称作不可重复读。

解决办法:把数据库事务隔离级别调整到 REPEATABLE_READ。

  • 幻读
    指一个事务 A 对一个表中的数据进行了修改,而且该修改涉及到表中所有的数据行;同时另一个事务 B 也在修改表中的数据,该修改是向表中插入一行新数据。那么经过这一番操作之后,操作事务 A 的用户就会发现表中还有没修改的数据行,就像发生了幻觉一样。这就被称作幻读。

解决办法:把数据库事务隔离级别调整到 SERIALIZABLE_READ。

隔离级别

  • 读未提交(read uncommitted)
    一个事务还没提交时,它做的变更就能被别的事务看到。

  • 读提交(read committed)
    一个事务提交之后,它做的变更才能被别的事务看到。

  • 可重复读(repeatable read)
    一个事务执行过程中看到的数据,总是跟这个事务在启动时看到的数据是一致的。当然在可重复隔离级别下,未提交变更对其它事务也是不可见的。

  • 串行化(serializable)
    对于同一行记录,“写”会加“写锁”,“读”会加“读锁”。当出现读写锁冲突的时候,后访问的事务必须等前一个事务执行完成,才能继续执行。

自动提交

可以通过执行 set autocommit 1|0 来设置事务是否自动提交。
MySQL 默认为 1,表示开启自动提交。
如果没有开启自动提交,当前 session 所连接的 MySQL 的所有操作都会当成一个事务,直到输入 rollback/commit,当前事务才算结束。当前事务结束前新的 MySQL 连接时无法读取到任何 session 的操作的结果的。
如果开起了,mysql 会把每个sql语句当成一个事务然后自动的 commit。
当然无论开启与否,start transaction commit|rollback 都是独立的事务。