人的知识就好比一个圆圈,圆圈里面是已知的,圆圈外面是未知的。你知道得越多,圆圈也就越大,你不知道的也就越多。

0%

定义

将任意长度的二进制值串映射为固定长度的二进制值串,这个映射的规则就是哈希算法,而通过原始数据映射之后得到的二进制值串就是哈希值。

要求

  • 从哈希值不能反向推导出原始数据(所以哈希算法也叫单向哈希算法)
  • 对输入数据非常敏感,哪怕原始数据只修改了一个 Bit,最后得到的哈希值也大不相同
  • 散列冲突的概率要很小,对于不同的原始数据,哈希值相同的概率非常小
  • 哈希算法的执行效率要尽量高效,针对较长的文本,也能快速地计算出哈希值

应用

  • 安全加密
  • 唯一标识
  • 数据校验
  • 散列函数、
  • 负载均衡
  • 数据分片

一致性hash

参考:深入浅出一致性Hash原理

散列思想

散列表用的是数组支持按照下标随机访问数据的特性,所以散列表其实就是数组的一种扩展,由数组演化而来。可以说,如果没有数组,就没有散列表。

散列函数

散列函数,顾名思义,它是一个函数。我们可以把它定义成hash(key),其中key表示元素的键值,hash(key)的值表示经过散列函数计算得到的散列值。

基本要求:

  • 散列函数计算得到的散列值是一个非负整数
  • 如果key1 = key2,那hash(key1) == hash(key2)
  • 如果key1 != key2,那hash(key1) != hash(key2)

性能考量

  • 散列函数的设计不能太复杂
  • 散列函数生成的值要尽可能随机并且均匀分布

散列冲突

开放寻址法

  • 线性探测
    当我们往散列表中插入数据时,如果某个数据经过散列函数散列之后,存储位置已经被占用了,我们就从当前位置开始,依次往后查找,看是否有空闲位置,直到找到为止。
  • 二次探测
    所谓二次探测,跟线性探测很像,线性探测每次探测的步长是1,那它探测的下标序列就是hash(key)+0,hash(key)+1,hash(key)+2……而二次探测探测的步长就变成了原来的“二次方”,也就是说,它探测的下标序列就是 hash(key)+0,hash(key)+12,…
  • 双重散列
    所谓双重散列,意思就是不仅要使用一个散列函数。我们使用一组散列函数 hash1(key),hash2(key),hash3(key)……我们先用第一个散列函数,如果计算得到的存储位置已经被占用,再用第二个散列函数,依次类推,直到找到空闲的存储位置。

链表法

在散列表中,每个“桶(bucket)”或者“槽(slot)”会对应一条链表,所有散列值相同的元素我们都放到相同槽位对应的链表中。

对比

相较于链表发,开放寻址法优点在于散列表中的数据都存储在数组中,可以有效地利用 CPU 缓存加快查询速度,另外它不包含指针,序列化起来比较简单,缺点则是用开放寻址法解决冲突的散列表,删除数据的时候比较麻烦,需要特殊标记已经删除掉的数据。而且,在开放寻址法中,所有的数据都存储在一个数组中,比起链表法来说,冲突的代价更高。

当数据量比较小、装载因子小的时候,适合采用开放寻址法。这也是Java中的ThreadLocalMap使用开放寻址法解决散列冲突的原因。

基于链表的散列冲突处理方法比较适合存储大对象、大数据量的散列表,而且,比起开放寻址法,它更加灵活,支持更多的优化策略,比如用红黑树代替链表。

装载因子

散列表的装载因子 = 填入表中的元素个数 / 散列表的长度
装载因子越大,说明空闲位置越少,冲突越多,散列表的性能会下降。不仅插入数据的过程要多次寻址或者拉很长的链,查找的过程也会因此变得很慢。

装载因子阈值的设置要权衡时间、空间复杂度。如果内存空间不紧张,对执行效率要求很高,可以降低负载因子的阈值;相反,如果内存空间紧张,对执行效率要求又不高,可以增加负载因子的值,甚至可以大于1。、、

当散列表的装载因子超过某个阈值时,就需要进行扩容。装载因子阈值需要选择得当。如果太大,会导致冲突过多;如果太小,会导致内存浪费严重。

动态扩容

通过这样均摊的方法,将一次性扩容的代价,均摊到多次插入操作中,就避免了一次性扩容耗时过多的情况。这种实现方式,任何情况下,插入一个数据的时间复杂度都是 O(1)。

概念

所谓“没有共享,就没有伤害”,ThreadLocal 是一个本地线程副本变量工具类,它主要用于将私有线程和该线程存放的副本对象做一个映射,各个线程之间的变量互不干扰,在高并发场景下,可以实现无状态的调用,特别适用于各个线程依赖不同的变量值完成操作的场景。

应用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 实现线程安全的DateFormat
*
* @author cdrcool
*/
class SafeDateFormat {
private static final ThreadLocal<DateFormat> TL =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));

public static DateFormat get() {
return TL.get();
}
}

//不同线程执⾏下⾯代码,返回的dateFormat是不同的
DateFormat dateFormat = SafeDateFormat.get();

实现原理

自定义实现

在解释 ThreadLocal 的工作原理之前,我们可以先自己想想:如果让我们来实现 ThreadLocal 的功能,会怎么设计呢?ThreadLocal 的目标是让不同的线程有不同的变量 v,那最直接的方法就是创建一个 Map,它的 Key 是线程,value 是每个线程拥有的变量 V,ThreadLocal 内部持有这样的一个 Map 就可以了。如下代码所示:

1
2
3
4
5
6
7
8
9
10
11
class MyThreadLocal<T> {
Map<Thread, T> locals = new ConcurrentHashMap<>();
//获取线程变量
T get() {
return locals.get(Thread.currentThread());
}
//设置线程变量
void set(T t) {
locals.put(Thread.currentThread(), t);
}
}

Java实现

Java的实现里面也有一个Map,叫做 ThreadLocalMap ,不过持有 ThreadLocalMap 的不是 ThreadLocal,而是 Thread。Thread 这个类内部有一个私有属性 threadLocals,其类型就是 ThreadLocalMap,ThreadLocalMap 的 key 是 ThreadLocal。我们可以结合下面的示意图和精简之后的 Java 实现代码来理解。

ThreaLocalMap示意图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class Thread {
// 内部持有ThreadLocalMap
ThreadLocal.ThreadLocalMap threadLocals;
}

class ThreadLocal<T> {
...

public T get() {
// ⾸先获取线程持有的ThreadLocalMap,等价于:ThreadLocalMap map = Thread.currentThread().threadLocals;
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
// 在ThreadLocalMap中查找变量
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}

static class ThreadLocalMap {
// Entry定义
static class Entry extends WeakReference<ThreadLocal<?>>{
Object value;
}

...

// 内部是数组⽽不是Map(线性探测法)
Entry[] table;

...

// 根据ThreadLocal查找Entry
private Entry getEntry(ThreadLocal<?> key) {
// 省略查找逻辑
}

...
}
}

初看上去,我们的设计方案和 Java 的实现仅仅是 Map 的持有方不同而已,我们的设计里面 Map 属于 ThreadLocal,而 Java 的实现里面 ThreadLocalMap 则是属于 Thread。这两种方式哪种更合理呢?
很显然 Java 的实现更合理一些。在 Java 的实现方案里面,ThreadLocal 仅仅是一个代理工具类,内部并不持有任何与线程相关的数据,所有和线程相关的数据都存储在Thread里面,这样的设计容易理解。而从数据的亲缘性上来讲,ThreadLocalMap 属于 Thread 也更加合理。
当然还有一个更加深层次的原因,那就是不容易产生内存泄露。在我们的设计方案中,ThreadLocal 持有的 Map 会持有 Thread 对象的引用,这就意味着,只要 ThreadLocal 对象存在,那么 Map 中的 Thread 对象就永远
不会被回收。ThreadLocal 的生命周期往往都比线程要长,所以这种设计方案很容易导致内存泄露。而 Java 的实现中 Thread 持有 ThreadLocalMap,而且 ThreadLocalMap 里对 ThreadLocal 的引用还是弱引用(WeakReference),所以只要 Thread 对象可以被回收,那么 ThreadLocalMap 就能被回收。Java 的这种实现方案虽然看上去复杂一些,但是更加安全。

原理

ThreadLocal原理图

ThreadLocal的原理:每个Thread内部维护着一个ThreadLocalMap,它是一个Map。这个映射表的Key是一个弱引用,其实就是ThreadLocal本身,Value是真正存的线程变量Object。
也就是说ThreadLocal本身并不真正存储线程的变量值,它只是一个工具,用来维护Thread内部的Map,帮助存和取。注意上图的虚线,它代表一个弱引用类型,而弱引用的生命周期只能存活到下次GC前。

内存泄漏

Java 的 ThreadLocal 实现应该称得上深思熟虑了,不过即便如此深思熟虑,还是不能百分百地让程序员避免内存泄露,例如在线程池中使用 ThreadLocal,如果不谨慎就可能导致内存泄露。

之所以会内存泄露,是因为在线程池中线程的存活时间太长,往往都是和程序同生共死的,这就意味着 Thread 持有的 ThreadLocalMap 一直都不会被回收,但是 ThreadLocalMap 中的 Entry 对 ThreadLocal 是弱引用(WeakReference),这意味着只要ThreadLocal结束了自己的生命周期是可以被回收掉的(在下次JVM垃圾收集时),但是 Entry 中的 value 却是被 Entry 强引用的,所以即便value的生命周期结束了,value 也是无法被回收的,从而导致内存泄露。

基本保证

Java 做了一些措施来保证 ThreadLocal 尽量不会内存泄漏:在 ThreadLocal 的 get()、set()、remove() 方法调用的时候会清除掉线程 ThreadLocalMap 中所有 Entry 中 key 为 null 的 value,并将整个 Entry 设置为 null,利于下次内存回收。
但这样也并不能保证 ThreadLocal 不会发生内存泄漏,例如:

  • 使用 static 的 ThreadLocal,延长了 ThreadLocal 的生命周期,可能导致的内存泄漏。
  • 分配使用了 ThreadLocal 又不再调用 get()、set()、remove() 方法,那么就会导致内存泄漏。

避免措施

每次使用完 ThreadLocal,都调用它的 remove() 方法手动释放对 value的强引用,清除数据。

1
2
3
4
5
6
7
8
9
10
11
12
ExecutorService es;
ThreadLocal tl;
es.execute(() -> {
// ThreadLocal增加变量
tl.set(obj);
try {
// 省略业务逻辑代码
} finally {
// ⼿动清理ThreadLocal
tl.remove();
}
});

InheritableThreadLocal与继承性

通过 ThreadLocal 创建的线程变量,其子线程是无法继承的。也就是说我们在线程中通过 ThreadLocal 创建了线程变量 V,而后该线程创建了子线程,那么在子线程中是无法通过 ThreadLocal 来访问父线程的线程变量 V 的。
如果需要子线程继承父线程的线程变量,我们可以使用 InheritableThreadLocal 来支持这种特性,InheritableThreadLocal 是 ThreadLocal 子类,所以用法和 ThreadLocal相同。
不过,完全不建议大家在线程池中使用 InheritableThreadLocal,不仅仅是因为它具有 ThreadLocal 相同的缺点——可能导致内存泄露,更重要的原因是:线程池中线程的创建是动态的,很容易导致继承关系错乱,如果我们的业务逻辑依赖 InheritableThreadLocal,那么很可能导致业务逻辑计算错误,而这个错误往往比内存泄露更要命。

特征

  1. 堆是一个完全二叉树
  2. 堆中每一个节点的值都必须大于等于(或小于等于)其子树中每个节点的值

之所以要求特征1,是因为完全二叉树要求除了最后一层,其他层的节点个数都是满的,最后一层的节点都靠左排列,这样用数组存储就不会有空隙。
对于每个节点的值都大于等于子树中每个节点值的堆,我们叫作“大顶堆”。对于每个节点的值都小于等于子树中每个节点值的堆,我们叫作“小顶堆”。

堆示例
上图中,1、2是大顶堆,3是小顶堆,4不是顶堆。

存储

根据堆的特征1,堆适用用数组来存储。
堆存储示例
从图中我们可以看到,数组中下标为i的节点的左子节点,就是下标为i ∗ 2i的节点,右子节点就是下标为i ∗ 2 + 1的节点,父节点就是下标为i/2的节点。

堆化

从下往上

让新插入的节点与父节点对比大小。如果不满足子节点小于等于父节点的大小关系,我们就互换两个节点。一直重复这个过程,直到父子节点之间满足刚说的那种大小关系。

示例:
从下往上堆化示例

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Heap {
private int[] a; // 数组,从下标 1 开始存储数据
private int n; // 堆可以存储的最大数据个数
private int count; // 堆中已经存储的数据个数

public Heap(int capacity) {
a = new int[capacity + 1];
n = capacity;
count = 0;
}

public void insert(int data) {
if (count >= n) return; // 堆满了
++count;
a[count] = data;
int i = count;
while (i / 2 > 0 && a[i] > a[i / 2]) { // 自下往上堆化
swap(a, i, i / 2); // swap() 函数作用:交换下标为 i 和 i/2 的两个元素
i = i / 2;
}
}
}

从上往下

最后一个节点放到堆顶,然后利用同样的父子节点对比方法。对于不满足父子节点大小关系的,互换两个节点,并且重复进行这个过程,直到父子节点之间满足大小关系为止。这就是从上往下的堆化方法。

示例:
从上往下堆化示例

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void removeMax() {
if (count == 0) return -1; // 堆中没有数据
a[1] = a[count];
--count;
heapify(a, count, 1);
}

private void heapify(int[] a, int n, int i) { // 自上往下堆化
while (true) {
int maxPos = i;
if (i*2 <= n && a[i] < a[i*2]) maxPos = i*2;
if (i*2+1 <= n && a[maxPos] < a[i*2+1]) maxPos = i*2+1;
if (maxPos == i) break;
swap(a, i, maxPos);
i = maxPos;
}
}

时间复杂度

一个包含n个节点的完全二叉树,树的高度不会超过log2n。
堆化的过程是顺着节点所在路径比较交换的,所以堆化的时间复杂度跟树的高度成正比,也就是O(logn)。
插入数据和删除堆顶元素的主要逻辑就是堆化,所以,往堆中插入一个元素和删除堆顶元素的时间复杂度都是O(logn)。

堆排序

可以把堆排序的过程大致分解成两个大的步骤,建堆和排序。

建堆

有“从下往上堆化”和“从上往下堆化”两种方式,后者效率更高,因为它只需要依次堆化非叶子节点。建堆时间复杂度为O(n)。

从下往上堆化

将下表从1到n的数据依次插入到堆中。

从上往下堆化

因为叶子节点往下堆化只能自己跟自己比较,所以我们直接从第一个非叶子节点开始,依次堆化就行了。

示例:
从上往下建堆示例

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static void buildHeap(int[] a, int n) {
for (int i = n/2; i >= 1; --i) {
heapify(a, n, i);
}
}

private static void heapify(int[] a, int n, int i) {
while (true) {
int maxPos = i;
if (i*2 <= n && a[i] < a[i*2]) maxPos = i*2;
if (i*2+1 <= n && a[maxPos] < a[i*2+1]) maxPos = i*2+1;
if (maxPos == i) break;
swap(a, i, maxPos);
i = maxPos;
}
}

排序

建堆结束之后,数组中的数据已经是按照大顶堆的特性来组织的。
数组中的第一个元素就是堆顶,也就是最大的元素。我们把它跟最后一个元素交换,那最大元素就放到了下标为n的位置。

这个过程有点类似上面讲的“删除堆顶元素”的操作,当堆顶元素移除之后,我们把下标为n的元素放到堆顶,然后再通过堆化的方法,将剩下的n−1个元素重新构建成堆。
堆化完成之后,我们再取堆顶的元素,放到下标是n−1的位置,一直重复这个过程,直到最后堆中只剩下标为 111 的一个元素,排序工作就完成了。

示例:
堆排序示例

代码:

1
2
3
4
5
6
7
8
9
10
// n 表示数据的个数,数组 a 中的数据从下标 1 到 n 的位置。
public static void sort(int[] a, int n) {
buildHeap(a, n);
int k = n;
while (k > 1) {
swap(a, 1, k);
--k;
heapify(a, k, 1);
}
}

时间复杂度:O(nlogn)

应用

  • 优先级队列

    • 合并有序小文件
    • 高性能定时器
  • TOP K

  • 中位数

背景

如果“为每个任务分配一个线程”,那么当需要创建大量线程时,会有以下问题:

  • 线程生命周期的开销非常高
    线程的创建与销毁都是需要开销的。如果请求的到达率非常高且请求的处理过程是轻量级的(大多数服务器应用程序就是这种情况),那么为每个请求创建一个新线程将消耗大量的计算资源。
  • 资源消耗增加
    活跃的线程会消耗系统资源,尤其是内存。如果可运行的线程数量多于可用处理器的数量,那么有些线程将闲置。大量空闲的线程会占用大量内存,给垃圾收集器带来压力,而且大量线程在竞争CPU资源时还将产生其它的性能开销。如果已经有足够多的线程是所有CPU保持忙碌状态,那么再创建更多的宪曾反而会降低性能。
  • 稳定性减低
    在可创建线程的数量上存在一些限制,如果破坏了这些限制,那么很可能抛出OutOfMemoryError异常。

反之,合理地使用线程池不仅能避免上述问题的发生,还具备以下好处:

  • 提高响应速度
    当任务到达,任务可以不需要等待线程创建就能立即执行。
  • 提高线程的可管理性
    使用线程池可以对线程进行统一分配、调优和监控。

实现原理

线程池的主要处理流程如下图所示:
线程池任务处理流程图
从图中可以看出,当提交一个新任务到线程池时,线程池的处理流程如下:

  • 线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务;如果是,则进入下个流程。
  • 线程池判断工作队列是否已满。如果没有满,则将任务存储在工作队列里;如果满了,则进入下个流程。
  • 线程池判判断线程池里的线程是否都处于工作状态。如果不是,则创建一个新的工作线程来执行任务;如果是,则依据饱和策略来处理这个任务。

使用

创建

线程池提供了以下构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
  • corePoolSize
    线程池的基本大小。
    当提交一个任务到线程池时,如果线程池中线程的个数小于corePoolSize,那么即使其它空闲的基本线程能够执行新任务,线程池也会创建新的线程。
    如果要提前创建并启动所有的基本线程,可以调用线程池的prestartAllCoreThreads方法。
  • maximumPoolSize
    线程池允许创建的最大线程数。
    如果队列满了,并且已创建的线程数小于maximumPoolSize,则线程池会再创建新的线程执行任务。
    需要注意的是,如果使用了无界队列,则该参数就没什么效果。
  • keepAliveTime
    线程池的工作线程空闲后所能保持存活的时间。
    如果任务很多,并且每个任务执行的时间比较短,可以调大时间,以提高线程的利用率。
  • unit
    线程存活时间的单位,其枚举值有:
    • DAYS
    • HOURS
    • MINUTES
    • MILLISECONDS
    • MICROSECONDS
    • NANOSECONDS
  • workQueue
    用于保存等待执行的任务的阻塞队列,可选的队列实现有:
    • ArrayBlockingQueue
      基于数组结构的有界阻塞队列,此队列按FIFO原则(先进先出)对元素进行排序。
    • LinkedBlockingQueue
      基于链表结构的阻塞队列,此队列按FIFO原则对元素进行排序,吞吐量通常要高于ArrayBlockingQueue。
      静态工厂方法Executors.newFixedThreadPool()Executors.newSingleThreadExecutor()使用了这个队列。
    • SynchronousQueue
      不存储元素阻塞队列。每个插入操作必须灯到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue。
      静态工厂方法Executors.newCachedThreadPool()使用了这个队列。
    • PriorityBlockingQueue
      具有优先级的无限阻塞队列。
      建议使用有界队列,有界队列能增加系统的稳定性和预警能力。
      • threadFactory
        创建线程的工厂。
        可以通过线程工厂给每个创建出来的线程设置更有意义的名字,比如使用guava提供的ThreadFactoryBuilder类:
        1
        new ThreadFactoryBuilder().setNameFormat("xx-task-%d").build();
        Executors中定义了默认的实现DefaultThreadFactory
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
        Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
        poolNumber.getAndIncrement() +
        "-thread-";
        }

        public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
        namePrefix + threadNumber.getAndIncrement(),
        0);
        if (t.isDaemon())
        t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
        t.setPriority(Thread.NORM_PRIORITY);
        return t;
        }
        }
  • handler
    饱和策略。
    当线程池和队列都满了,说明线程池处于饱和状态,那么必须采取某种策略来处理新提交的任务,可选饱和策略有:
    • AbortPolicy:直接抛出异常。
    • DiscardPolicy:不处理,丢弃掉。
    • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
    • CallerRunsPolicy:用调用者所在的线程来执行任务。

线程池默认的拒绝策略会throw RejectedExecutionException,这是个运行时异常,对于运行时异常编译器并不强制catch它,所以开发人员很容易忽略。因此默认拒绝策略要慎重使用。如果线程池处理的任务非常重要,建议自定义自己的拒绝策略;并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用。

提交任务

  • execute
    1
    2
    3
    4
    5
    6
    7
    /**
    * 在将来某个时候执行给定的任务。任务可以在新线程中执行,也可以在现有的池线程中执行。
    * 如果无法提交任务供执行,要么是因为此执行器已关闭,要么是因为其容量已达到,则由当前{@code RejectedExecutionHandler}处理该任务。
    *
    * @param command the task to execute
    */
    void execute(Runnable command);
  • submit
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    /**
    * 提交任务并返回一个Future对象
    * 一旦任务执行成功,调用Futrue的get方法将会任务执行结果
    *
    * @param task 提交的任务
    * @return a Future representing pending completion of the task
    */
    <T> Future<T> submit(Callable<T> task);

    /**
    * 提交任务并返回一个Future对象
    * 一旦任务执行成功,调用Futrue的get方法将会返回给定result
    *
    * @param task 提交的任务
    * @param result 返回值
    * @return a Future representing pending completion of the task
    */
    <T> Future<T> submit(Runnable task, T result);

    /**
    * 提交任务并返回一个Future对象
    * 一旦任务执行成功,调用Futrue的get方法将会返回null
    *
    * @param task 提交的任务
    * @return a Future representing pending completion of the task
    */
    <T> Future<T> submit(Runnable task);
  • invokeAll/invokeAny
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    /**
    * 执行给定任务列表,并当所有任务都执行完毕后,返回Future对象列表
    * @param tasks 提交的任务列表
    * @return a list of Futures representing the tasks, in the same sequential order as produced by the iterator for the
    * given task list, each of which has completed
    */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

    /**
    * 执行给定任务列表,并当所有任务都执行完毕后,返回Future对象列表
    * 如果超时,还没有执行完毕的任务会被取消
    * @param tasks 提交的任务列表
    * @return a list of Futures representing the tasks, in the same sequential order as produced by the iterator for the
    * given task list. If the operation did not time out, each task will have completed. If it did time out, some
    * of these tasks will not have completed.
    */
    T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
    throws InterruptedException;

    /**
    * 执行给定任务列表,并当其中一个任务执行完毕后,返回Future对象
    * @param tasks 提交的任务列表
    * @return the result returned by one of the tasks
    */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

    /**
    * 执行给定任务列表,并当其中一个任务执行完毕后,返回Future对象
    * 如果超时,抛出TimeoutException
    * @param tasks 提交的任务列表
    * @return the result returned by one of the tasks
    */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;

使用线程池,需要注意异常处理的问题,例如通过ThreadPoolExecutor对象的execute()方法提交任务时,如果任务在执行的过程中出现运行时异常,会导致执行任务的线程终止;不过,最致命的是任务虽然异常了,但是开发人员却获取不到任何通知,这会让开发人员误以为任务都执行得很正常。虽然线程池提供了很多用于异常处理的方法,但是最稳妥和简单的方案还是捕获所有异常并按需处理,我们可以参考下面的示例代码。

1
2
3
4
5
6
7
try {
// 业务逻辑
} catch (RuntimeException x) {
// 按需处理
} catch (Throwable x) {
// 按需处理
}

关闭线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 关闭线程池
* 不再接受新的任务,同时等待已经提交的任务执行完成——包括那些还未开始执行的任务
*/
void shutdown();

/**
* 关闭线程池
* 将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务
*/
void shutdownNow();
```
其原理都是遍历线程池中的工作线程,然后逐个调用线程的`interrupt`方法来中断线程,所以无法响应中断的任务可能永远无法终止。
不过它们也存在一定的区别,`shutdownNow`首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并且等待执行任务的列表,而`shutdown`只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。

### 线程池种类
`Executors`提供了一系列静态方法,利用它们我们可以很便捷的创建各种类型的线程池,如`FixedThreadPool`、`SingleThreadExecutor`、`CachedThreadPool`,又如具备调度功能的`ScheduledExecutorService`,以及遵循“工作窃取”模式的`ForkJoinPool`等。

#### FixedThreadPool
FixedThreadPool被称为可重用固定线程数的线程池,下面是源码实现:
```java
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
  1. FixedThreadPool的corePoolSize和maximumPoolSize都被设置为参数nThreads的值
  2. FixedThreadPool的keepAliveTime参数设置为0L,意味着多余的空闲线程会被立即终止
  3. FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列容量为Integer.MAX_VALUE)
  4. 由于使用无界队列,所以maximumPoolSize和keepAliveTime无效,所以有1和2
  5. 由于使用无界队列,所以运行中的FixedThreadPool(未执行shutDown()或shutDownNow)不会拒绝任务(不会调用RejectedExecutionHandler.rejected方法)

SingleThreadExecutor

SingleThreadExecutor是使用单个Worker线程的Executor,下面是源码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
  1. SingleThreadExecutor的corePoolSize和maximumPoolSize都被设置为1
  2. SingleThreadExecutor的keepAliveTime参数设置为0L,意味着多余的空闲线程会被立即终止
  3. SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列容量为Integer.MAX_VALUE)
  4. 由于使用无界队列,所以maximumPoolSize和keepAliveTime无效,所以有1和2
  5. 由于使用无界队列,所以运行中的FixedThreadPool(未执行shutDown()或shutDownNow)不会拒绝任务(不会调用RejectedExecutionHandler.rejected方法)

CachedThreadPool

CachedThreadPool是一个会根据需要创建新线程的线程池,下面是源码实现:

1
2
3
4
5
6
7
8
9
10
11
12
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
  1. CachedThreadPool的corePoolSize被设置为0,即corePool为空。
  2. CachedThreadPool的maximumPoolSize被设置为Integer.MAX_VALUE,即线程池是无界的。
  3. CachedThreadPool的keepAliveTime参数设置为60L,意味着多余的线程空闲时间超过60秒将会被终止。
  4. CachedThreadPool使用SynchronousQueue作为线程池的工作队列,它是一个没有容量的阻塞队列,每个插入操作必须等待另一个线程对应的移除操作,反之亦然。
  5. 如果任务提交的速度高于线程池中任务执行的速度时,CachedThreadPool会不断地创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。

CachedThreadPool运行示意图如下:
CachedThreadPool运行示意图

  1. 首先执行SynchronousQueue.offer。
  2. 如果当前线程池中有空闲线程正在执行SynchronousQueue.poll,那么主线程执行的offer操作与空闲线程执行的poll操作配对成功主线程把任务交给空闲线程执行。
  3. 如果线程池中没有空闲线程,即没有线程执行SynchronousQueue.poll,此时CachedThreadPool会创建一个新线程执行任务。
  4. 如果执行的是方法SynchronousQueue.poll(long timeout, TimeUnit unit),该操作会让空闲线程最多等待指定的时间,如果指定时间内主线程提交了一个新任务,那么这个线程将执行主线程提交的新任务;否则,这个线程将终止。

监控

如果在系统总大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,可以根据线程池的使用状况快速定位问题。在监控线程池的时候可以使用以下属性:

  • taskCount
    线程池需要执行的任务数量
  • completedTaskCount
    线程池在运行过程中已完成的任务数量,小于或等于taskCount
  • largestPoolSize
    线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是否曾经瞒过。
  • getPoolSize
    线程池的线程数量。
  • getActiveCount
    获取活动的线程数。

另外还能通过继承线程池来自定义线程池,重写线程池的方法,来实现在任务执行前、执行后和线程池关闭前执行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最小执行时间等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {

public CustomThreadPoolExecutor() {
super(10, 10,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println(t.getName() + ": before execute");
super.beforeExecute(t, r);
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("after execute");
super.afterExecute(r, t);
}

@Override
protected void terminated() {
System.out.println("terminated");
super.terminated();
}
}

@Test
public void test1() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(10);

CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor();

IntStream.range(0, 10)
.forEach(index -> executor.execute(() -> {
System.out.println(String.format("线程%s运行中", index + 1));
System.out.println("completedTaskCount: " + executor.getCompletedTaskCount());
countDownLatch.countDown();
}));

countDownLatch.await();

executor.shutdown();

System.out.println("completedTaskCount: " + executor.getCompletedTaskCount());
}

配置

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析:

  • 任务的性质
    CPU密集型任务、IO密集型任务和混合型任务。
    CPU密集型任务应配置尽可能少的线程,如配置N(CPU个数)+1个线程。由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2N个线程。混合型的任务,如果可以拆分,且拆分后的任务的执行时间相差不大,则将其拆分为一个CPU密集型任务和一个IO密集型任务。
    可以通过Runtime.getRuntime.availableProcessors()方法获得当前设备地CPU个数。
  • 任务的优先级
    高、中和低。
    优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。
    需要注意的是,如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不会执行。
  • 任务的执行时间
    长、中和短。
    执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。
  • 任务的依赖性
    是否依赖其它系统资源,如数据库连接。
    依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。

源码分析

首先,看下线程池执行任务的方法execute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 在将来的某个时候执行给定的任务。这个任务可能在新线程或现有的池线程中执行。
*
* 如果线程池被关闭,又或者线程池的容量达到了,那么这个任务将无法被提交执行,而是交由当前的{@code RejectedExecutionHandler}处理。
*
* @param command 要执行的任务
* @throws RejectedExecutionException 如果任务不能被接受执行
* @throws NullPointerException 如果{@code command}是null
*/
public void execute(Runnable command) {
// 如果任务为空,抛出空异常
if (command == null)
throw new NullPointerException();

int c = ctl.get();
// 如果运行的线程数小于corePoolSize,则创建线程并执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}

// 如果任务能够被添加到队列里(仍需做double-check)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 否则尝试创建新的线程
else if (!addWorker(command, false))
// 如果创建新的线程失败,则拒绝执行任务
reject(command); // is shut down or saturated
}

然后,再看下Worker是怎么执行任务的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循环获取工作队列(BlockingQueue<Runnable>)里的任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行任务之前
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行任务之后
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

执行示意图

ThreadPoolExecutor执行示意图

类别

  • 二叉树
    每个节点最多有两个“叉”,也就是两个子节点,分别是左子节点和右节点。
  • 满二叉树
    除了叶子节点之外,每个节点都有左右两个子节点。
  • 完全二叉树
    若设二叉树的深度为h,除第h层外,其它各层(1~h-1)的结点数都达到最大个数,第h层所有的结点都连续集中在最左边。
  • 平衡二叉树
    左右两个子树的高度差的绝对值不超过1,并且左右两个子树都是一棵平衡二叉树。

存储

链式存储法

每个节点有三个字段,其中一个存储数据,另外两个是指向左右子节点的指针。我们只要拎住根节点,就可以通过左右子节点的指针,把整棵树都串起来。
二叉树链式存储法示例

顺序存储法

把根节点存储在下标i = 1的位置,那左子节点存储在下标2 * i = 2的位置,右子节点存储在2 * i + 1 = 3的位置。以此类推。
二叉树顺序存储法示例

遍历

前序遍历

对于树中的任意节点来说,先打印这个节点,然后再打印它的左子树,最后打印它的右子树。

1
2
3
4
5
6
7
8
9
```java
public void preOrder(BinaryTree root) {
if (root == null) {
return;
}
System.out.println(root.data);
preOrder(root.left);
preOrder((root.right));
}

中序遍历

对于树中的任意节点来说,先打印它的左子树,然后再打印这个节点,最后打印它的右子树。

1
2
3
4
5
6
7
8
9
```java
public void inOrder(BinaryTree root) {
if (root == null) {
return;
}
preOrder(root.left);
System.out.println(root.data);
preOrder((root.right));
}

后序遍历

对于树中的任意节点来说,先打印它的右子树,然后再打印它的左子树,最后打印这个节点。

1
2
3
4
5
6
7
8
9
```java
public void postOrder(BinaryTree root) {
if (root == null) {
return;
}
preOrder(root.left);
preOrder((root.right));
System.out.println(root.data);
}

层序遍历

按层遍历。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public List<List<Integer>> levelOrder(TreeNode root) {
if (root == null) return new ArrayList<>(0);

List<List<Integer>> result = new ArrayList<>();

Queue<TreeNode> queue = new LinkedList<TreeNode>();
queue.offer(root);

Queue<TreeNode> curLevelNodes = new LinkedList<TreeNode>();

while (!queue.isEmpty()) {
TreeNode node = queue.poll();
curLevelNodes.offer(node);

if (queue.isEmpty()) {
List<Integer> list = new ArrayList<>(curLevelNodes.size());
while (!curLevelNodes.isEmpty()) {
TreeNode curNode = curLevelNodes.poll();
list.add(curNode.val);

if (curNode.left != null) {
queue.offer(curNode.left);
}

if (curNode.right != null) {
queue.offer(curNode.right);
}

}
result.add(list);
}
}


return result;
}

基本操作

查找

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public BinaryTree find(BinaryTree tree, int data) {
/*
循环查找
*/
BinaryTree root = tree;

while (root != null) {
if (tree.data == data) {
return tree;
} else if (tree.data < data) {
root = root.right;
} else {
root = root.left;
}
}

return null;

/*
递归查找
*/
// if (tree == null) {
// return null;
// }
// if (tree.data == data) {
// return tree;
// } else if (tree.data < data) {
// return find(tree.right, data);
// } else {
// return find(tree.left, data);
// }
}

插入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void insert(BinaryTree tree, int data) {
if (tree == null) {
tree = new BinaryTree(data);
return;
}

BinaryTree root = tree;
while (true) {
if (data > tree.data) {
if (root.right == null) {
root.right = new BinaryTree(data);
return;
}
root = root.right;
} else if (data < tree.data) {
if (root.left == null) {
root.left = new BinaryTree(data);
return;
}
root = root.left;
}
}
}

删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public void delete(BinaryTree tree, int data) {
BinaryTree p = tree;
BinaryTree pp = null;

while (p != null && p.data != data) {
pp = p;

if (p.data > data) {
p = p.left;
} else {
p = p.right;
}
}

// 没有找到
if (p == null) {
return;
}

// 如果同时有左节点和右节点,找到右节点中最小的
if (p.left != null && p.right != null) {
BinaryTree minp = p.right;
BinaryTree minpp = p;
while (minp.left != null) {
minpp = minp;
minp = minp.left;
}

p.data = minp.data;

p = minp;
pp = minpp;
}

// 删除节点是叶子节点或者仅有一个子节点
BinaryTree child;
if (p.left != null) {
child = p.left;
} else if (p.right != null) {
child = p.right;
} else {
child = null;
}

// 删除的是根节点
if (pp == null) {
tree = child;
}
// 同时有左节点和右节点,或仅有一个子节点且删除的是左节点
else if (pp.left == p) {
pp.left = child;
}
// 仅有一个子节点且删除的是右节点
else {
pp.right = child;
}
}

重复数据

在前面的操作中,都是假定不存在键值相同的情况。如果存在重复数据,有两种解决方式。

  • 方式一
    一种是二叉查找树中每一个节点不仅会存储一个数据,因此我们通过链表和支持动态扩容的数组等数据结构,把值相同的数据都存储在同一个节点上。

  • 方式二
    另一种是每个节点仍然只存储一个数据。
    在查找插入位置的过程中,如果碰到到一个节点的值,与要插入数据的值相同,我们就将这个要插入的数据放到这个节点的右子树,也就是说,把这个新插入的数据当作大于这个节点的值来处理。
    二叉树插入示例
    当要查找数据的时候,遇到值相同的节点,我们并不停止查找操作,而是继续在右子树中查找,直到遇到叶子节点,才停止。这样就可以把键值等于要查找值的所有节点都找出来。
    二叉树查找示例
    对于删除操作,我们也需要先查找到每个要删除的节点,然后再按前面讲的删除操作的方法,依次删除。
    二叉树删除示例

由来

B+树通常用于实现索引,像Mysql索引就是基于B+树实现的,而索引除了要支持增删改查之外,还需要支持范围查询。并且由于索引通常是存在与磁盘上,这就还要求磁盘访问次数不能过多。

常用数据结构比较

  • 散列表
    查询时间复杂度O(1),不支持区间查询
  • 红黑树
    查询时间复杂度O(lon2n),对其进行中序遍历,能得到从小到大有序的数据序列,但不足以支持区间查询
  • 跳表
    查询时间复杂度O(lon2n),支持区间查询
  • B+树
    查询时间复杂度O(lon2n),支持区间查询

二叉树改造

我们可以对二叉查找树进行改造,从而让其支持区间查询:树中的节点并不存储数据本身,而是只是作为索引。除此之外,我们把每个叶子节点串在一条链表上,链表中的数据是从小到大有序的。经过改造之后的二叉树,就像图中这样:
二叉查找树改造示例

但是,当索引数据量达到几千万甚至上亿的时候,如果将索引存储在内存中,尽管内存访问的速度非常快,查询的效率非常高,但是占用的内存会非常多。
我们可以借助时间换空间的思路,把索引存储在硬盘中。不过磁盘是一个非常慢速的存储设备。通常内存的访问速度是纳秒级别的,而磁盘访问的速度是毫秒级别的。读取同样大小的数据,从磁盘中读取花费的时间,是从内存中读取所花费时间的上万倍,甚至几十万倍。这种将索引存储在硬盘中的方案,尽管减少了内存消耗,但是在数据查找的过程中,需要读取磁盘中的索引,因此数据查询效率就相应降低很多。

那么,接下来优化的重点就是尽量减少磁盘IO操作,也就是尽量降低树的高度:实现m叉树。

特征

  • 每个节点中子节点的个数不能超过 m,也不能小于 m/2
  • 根节点的子节点个数可以不超过 m/2,这是一个例外
  • m叉树只存储索引,并不真正存储数据,这个有点儿类似跳表
  • 通过链表将叶子节点串联在一起,这样可以方便按区间查找
  • 一般情况,根节点会被存储在内存中,其他节点存储在磁盘中

分裂&合并

分裂

不管是内存中的数据,还是磁盘中的数据,操作系统都是按页来读取的,一次会读一页的数据。如果要读取的数据量超过一页的大小,就会触发多次IO操作。所以,我们在选择m大小的时候,要尽量让每个节点的大小等于一个页的大小。读取一个节点,,只需要一次磁盘IO操作。
对于一个B+树来说,m值是根据页的大小事先计算好的,也就是说,每个节点最多只能有m个子节点。在往数据库中写入数据的过程中,这样就有可能使索引中某些节点的子节点个数超过m,这个节点的大小超过了一个页的大小,读取这样一个节点,就会导致多次磁盘IO操作。
我们只需要将这个节点分裂成两个节点。但是,节点分裂之后,其上层父节点的子节点个数就有可能超过 m 个。不过这也没关系,我们可以用同样的方法,将父节点也分裂成两个节点。这种级联反应会从下往上,一直影响到根节点。
B+树节点分裂示例

合并

频繁的数据删除,就会导致某些结点中,子节点的个数变得非常少,长此以往,如果每个节点的子节点都比较少,势必会影响索引的效率。
我们可以设置一个阈值。在B+树中,这个阈值等于m/2。如果某个节点的子节点个数小于m/2,我们就将它跟相邻的兄弟节点合并。不过,合并之后结点的子节点个数有可能会超过m。针对这种情况,我们可以借助插入数据时候的处理方法,再分裂节点。
B+树节点合并示例

b*树

B* 树是B+树的变体,在B+树的非根和非叶子结点再增加指向兄弟的指针;B树定义了非叶子结点关键字个数至少为(2/3)M,即块的最低使用率为2/3(代替B+树的1/2)。

  • B+树的分裂
    当一个结点满时,分配一个新的结点,并将原结点中1/2的数据复制到新结点,最后在父结点中增加新结点的指针;B+树的分裂只影响原结点和父结点,而不会影响兄弟结点,所以它不需要指向兄弟的指针
  • B* 树的分裂
    当一个结点满时,如果它的下一个兄弟结点未满,那么将一部分数据移到兄弟结点中,再在原结点插入关键字,最后修改父结点中兄弟结点的关键字(因为兄弟结点的关键字范围改变了);如果兄弟也满了,则在原结点与兄弟结点之间增加新结点,并各复制1/3的数据到新结点,最后在父结点增加新结点的指针
    所以,B* 树分配新结点的概率比B+树要低,空间使用率更高;
    B星树示例

红黑树是一种自平衡二叉树:即在频繁的动态更新过程中,不会出现树的高度远大于log2n的情况,从而导致各个操作的效率下降。

红黑树具有以下性质:

  1. 节点是红色或黑色
  2. 根节点是黑色
  3. 任何相邻的节点都不能同时为红节点,也就是说,红节点是被黑节点隔开的
  4. 从任一节点到其每个叶子的所有路径都包含相同数目的黑色节点
  5. 每个叶子节点都是黑色的空节点(NIL),也就是说,叶子节点不存储数据

根据性质3可知,最短的可能路径都是黑节点,最长的可能路径都是交替的黑节点和红节点,而性质4要求所有路径都包含相同数目的黑色节点,这就约束强制了红黑树的关键性质:从根节点到叶子节点的最长的可能路径不多于最短的可能路径的两倍长,即树的高度不会大于2lon2n。

我们知道,魔方的复原解法是由固定的算法的:遇到哪几面是什么样子,就对应怎么转几下。红黑树的自平衡过程跟魔方的复原非常神似,大致过程就是:遇到什么样的节点排布,就对应怎么去调整。只要按照这些固定的调整规则来操作,就能将一个非平衡的红黑树调整成平衡的。
上面的性质5:每个叶子节点都是黑色的空节点(NIL),就是为了方便的实现自平衡而添加的约束。

至于红黑树具体是怎样进行自平衡操作的,则首先需要了解两个非常重要的操作:左旋(rotate left)、右旋(rotate righ)。示例:
红黑树左右旋示例

我们可以通过向ExecutorService提交CallableFutureTask来异步获取线程执行结果,但这种方式的缺陷在于Future.get()会阻塞主线程,导致即使和它同时进行的其它线程已经执行完毕,也要等待这个耗时线程执行完才能获取结果,大大影响运行效率。
ExecutorCompletionService聚合了ExecutorBlockingQueue,利用它我们每次都能从阻塞队列中获取到最近执行完毕的futureTask,而避免等待某个耗时较长的任务执行。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/**
* 模拟电商询价系统
*/
@Test
public void enquiry() {
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建CompletionService
CompletionService<Integer> service = new ExecutorCompletionService<>(executor);

// 异步向电商S1询价
service.submit(this::getPriceByS1);
// 异步向电商S2询价
service.submit(this::getPriceByS2);
// 异步向电商S3询价
service.submit(this::getPriceByS3);

// 获取任务结果
IntStream.range(0, 3).forEach(index -> {
try {
// 按任务执行完毕的顺序,获取任务结果
Future<Integer> future = service.take();
Integer value = future.get();
System.out.println("value: " + value);

// 将询价结果异步保存到数据库
executor.execute(() -> save(value));
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}

private Integer getPriceByS1() {
return 100;
}

private Integer getPriceByS2() {
return 200;
}

private Integer getPriceByS3() {
return 300;
}

private void save(Integer value) {
System.out.println("save value: " + value);
}

/**
* 模拟从多个途径获取值,如果有一个途径获取成功,就取消所有任务并返回结果
*/
@Test
public void multiPath() {
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建CompletionService
CompletionService<Integer> service = new ExecutorCompletionService<>(executor);

// ⽤于保存Future对象
List<Future<Integer>> futures = new ArrayList<>(3);

// 提交异步任务,并保存future到futures
futures.add(service.submit(this::getPriceByS1));
futures.add(service.submit(this::getPriceByS2));
futures.add(service.submit(this::getPriceByS3));

// 获取最快返回的任务执⾏结果
Integer value = 0;
try {
// 只要有⼀个成功返回,则break
for (int i = 0; i < 3; ++i) {
try {
value = service.take().get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

// 简单地通过判空来检查是否成功返回
if (value != null) {
break;
}
}
} finally {
System.out.println("value: " + value);

// 取消所有任务
futures.forEach(future -> future.cancel(true));
}
}

如上,我们通过调用submit(Callable<V> task)方法向CompletionService提交任务,获取任务结果则是先调用take()获取到futureTask,再调用futureTask.get()获取任务执行结果,如果take()没获取到futureTask,主线程就会一直阻塞。

UML

CompletionService UML

实现原理

CompletionService内部聚合了ExecutorBlockingQueue,这样向其提交的任务都会交由Executor执行,任务结果则存放在BlockingQueue中,于是就能利用BlockingQueue的特性,使得在获取任务结果时,如果还没有任务完成,就可以选择阻塞或返回null。
至于任务结果是如何存放到BlockingQueue中的,则是通过将任务包装成QueueingFutureQueueingFuture继承自FutureTask并覆盖了done()方法:task自行完毕后将结果保存到BlockingQueue中。

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
public class ExecutorCompletionService<V> implements CompletionService<V> {
/**
* 线程池,由构造函数传入
*/
private final Executor executor;
/**
* executor类型为AbstractExecutorService,aes的值就是executor,否则为null
*/
private final AbstractExecutorService aes;
/**
* 阻塞队列,线程执行完毕后往该队列中插入值,主线程每次从该队列中获取值
*/
private final BlockingQueue<Future<V>> completionQueue;

/**
* FutureTask extension to enqueue upon completion
* 继承FutureTask,主要是实现了里面的空方法done
*/
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
/**
* futureTask执行完毕后,就往阻塞队列中插入值
*/
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}

private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}

private RunnableFuture<V> newTaskFor(Runnable task, V result) {
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
}

public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}

/**
* 提交任务
*/
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
// 包装原始task,以实现task执行完毕后,会往阻塞队列中插入task
executor.execute(new QueueingFuture(f));
return f;
}

/**
* 获取执行完毕的任务:从阻塞队列中获取,如果队列为空就会一直阻塞
*/
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}

/**
* 获取执行完毕的任务:从阻塞队列中获取,如果队列为空就会一直阻塞
*/
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}

/**
* 获取执行完毕的任务:从阻塞队列中获取,如果队列为空就会返回null
*/
public Future<V> poll() {
return completionQueue.poll();
}

/**
* 获取执行完毕的任务:从阻塞队列中获取,如果队列为空就会先阻塞timeout,超时后还为空就返回null
*/
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}

}

思想

分治算法(divide and conquer)的核心思想其实就是四个字,分而治之 ,也就是将原问题划分成 n 个规模较小,并且结构与原问题相似的子问题,递归地解决这些子问题,然后再合并其结果,就得到原问题的解。

过程

分治算法一般都比较适合用递归来实现。分治算法的递归实现中,每一层递归都会涉及这样三个操作:

  • 分解:将原问题分解成一系列子问题
  • 解决:递归地求解各个子问题,若子问题足够小,则直接求解
  • 合并:将子问题的结果合并成原问题

条件

分治算法能解决的问题,一般需要满足下面这几个条件

  • 原问题与分解成的小问题具有相同的模式
  • 原问题分解成的子问题可以独立求解,子问题之间没有相关性
  • 具有分解终止条件,也就是说,当问题足够小时,可以直接求解
  • 可以将子问题合并成原问题,而这个合并操作的复杂度不能太高,否则就起不到减小算法总体复杂度的效果了

实际应用

  • 归并排序
  • MapReduce