人的知识就好比一个圆圈,圆圈里面是已知的,圆圈外面是未知的。你知道得越多,圆圈也就越大,你不知道的也就越多。

0%

  1. 创建 network

    1
    docker network create --driver bridge --subnet 172.22.0.0/16 --gateway 172.22.0.1  op_net
  2. 创建 zookeeper.yml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    version: '3.1'

    services:
    zk-1:
    image: zookeeper
    restart: always
    hostname: zk-1
    container_name: zk-1
    ports:
    - 2181:2181
    environment:
    ZOO_MY_ID: 1
    ZOO_SERVERS: server.1=0.0.0.0:2888:3888 server.2=zk-2:2888:3888 server.3=zk-3:2888:3888
    networks:
    default:
    ipv4_address: 172.22.0.11

    zk-2:
    image: zookeeper
    restart: always
    hostname: zk-2
    container_name: zk-2
    ports:
    - 2182:2181
    environment:
    ZOO_MY_ID: 2
    ZOO_SERVERS: server.1=zk-1:2888:3888 server.2=0.0.0.0:2888:3888 server.3=zk-3:2888:3888;
    networks:
    default:
    ipv4_address: 172.22.0.12

    zk-3:
    image: zookeeper
    restart: always
    hostname: zk-3
    container_name: zk-3
    ports:
    - 2183:2181
    environment:
    ZOO_MY_ID: 3
    ZOO_SERVERS: server.1=zk-1:2888:3888 server.2=zk-2:2888:3888 server.3=0.0.0.0:2888:3888
    networks:
    default:
    ipv4_address: 172.22.0.13

    networks:
    default:
    external:
    name: op_net
  3. 创建 kafka.yml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    version: '3.1'

    services:
    kafka-1:
    image: wurstmeister/kafka
    restart: always
    hostname: kafka-1
    container_name: kafka-1
    ports:
    - 9092:9092
    environment:
    KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://10.11.155.23:9092
    KAFKA_ZOOKEEPER_CONNECT: zk-1:2181,zk-2:2181,zk-3:2181
    external_links:
    - zk-1
    - zk-2
    - zk-3
    networks:
    default:
    ipv4_address: 172.22.0.14

    kafka-2:
    image: wurstmeister/kafka
    restart: always
    hostname: kafka-2
    container_name: kafka-2
    ports:
    - 9093:9092
    environment:
    KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://10.11.155.23:9093
    KAFKA_ZOOKEEPER_CONNECT: zk-1:2181,zk-2:2181,zk-3:2181
    external_links:
    - zk-1
    - zk-2
    - zk-3
    networks:
    default:
    ipv4_address: 172.22.0.15

    kafka-3:
    image: wurstmeister/kafka
    restart: always
    hostname: kafka-3
    container_name: kafka-3
    ports:
    - 9094:9092
    environment:
    KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://10.11.155.23:9094
    KAFKA_ZOOKEEPER_CONNECT: zk-1:2181,zk-2:2181,zk-3:2181
    external_links:
    - zk-1
    - zk-2
    - zk-3
    networks:
    default:
    ipv4_address: 172.22.0.16

    networks:
    default:
    external:
    name: op_net
  4. 启动 ZooKeeper 集群

    1
    docker-compose -f zookeeper.yml up -d
  5. 启动 Kafka 集群

    1
    docker-compose -f kafka.yml up -d

参考资料:

  1. docker安装zookeeper集群
  2. docker安装kafka集群

修改站点信息

打开 _config.yml 文件,修改其中的 Site 下的 title 等属性:

1
2
3
4
5
6
7
8
# Site
title: 雨中的了悟
subtitle: ''
description: ''
keywords:
author: 晴天
language: zh-CN
timezone: ''

更换主题

Hexo 默认使用 landscape 主题,同时支持配置为其它主题,这里以配置为 next 主题来介绍如何操作。

  1. 访问 https://github.com/theme-next/hexo-theme-next
  2. 在根目录下执行命令:git clone https://github.com/theme-next/hexo-theme-next themes/next
  3. 打开 _config.yml 文件,将其中的 theme 属性由 landscape 改为 next

next 主题默认只开放了 首页 和 归档 两个菜单,且在侧边栏中只有 日志 链接可以点击查看详情,现在我想添加 分类 菜单,同时让侧边栏中的 分类 和 标签 链接都能点击。

添加分类菜单

打开 \themes\next_config.yml 文件,将其中的 menu 属性下的 categories 属性前面的注释取消掉。

创建分类和标签链接

首先,执行以下两行命令:

1
2
hexo new page "categories"
hexo new page "tags"

之后,可以发现在 source 目录下多出了 categories 和 tags 两个子目录,然后分别修改这两个子目录中的 index.md 文件:

  • categories

    1
    2
    3
    type: categories
    date: 2020-01-15 11:24:00
    comments: false
  • tags

    1
    2
    3
    4
    type: tags
    type: tags
    date: 2020-01-15 11:24:00
    comments: false

部署到 GitHub

  1. 访问 https://github.com/hexojs/hexo-deployer-git
  2. 在根目录下执行命令:npm install hexo-deployer-git --save
  3. 打开 _config.yml 文件,在文件末尾添加以下配置:
    1
    2
    3
    4
    5
    deploy:
    type: git
    repo: git@github.com:cdrcool/cdrcool.github.io.git
    # repo: https://github.com/cdrcool/cdrcool.github.io
    branch: master
  4. 执行命令:hexo d -g

集成 Gitalk

  1. 访问 https://github.com/settings/developers
  2. 创建 OAuth App:
    OAuth App 创建示例
  3. 打开 \themes\next_config.yml 文件,配置 gittalk 相关属性:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    # Gitalk
    # For more information: https://gitalk.github.io, https://github.com/gitalk/gitalk
    gitalk:
    enable: true
    github_id: cdrcool # GitHub repo owner
    repo: cdrcool.github.io # Repository name to store issues
    client_id: 5f0b1c1d42822204f803 # GitHub Application Client ID
    client_secret: 56818d47fc75f1f4999acd7a3356879bbade310f # GitHub Application Client Secret
    admin_user: cdrcool # GitHub repo owner and collaborators, only these guys can initialize gitHub issues
    distraction_free_mode: true # Facebook-like distraction free mode
    # Gitalk's display language depends on user's browser or system environment
    # If you want everyone visiting your site to see a uniform language, you can set a force language value
    # Available values: en | es-ES | fr | ru | zh-CN | zh-TW
    language: zh-CN

注意:文件名中中英文之间不能带有空格,不然授权成功后回调时会报 redirect_uri_mismatch 异常。

启用打赏

打开 \themes\next_config.yml 文件,配置 reward 相关属性:

1
2
3
4
5
6
7
8
9
10
11
12
# Reward (Donate)
# Front-matter variable (unsupport animation).
reward_settings:
# If true, reward will be displayed in every article by default.
enable: true
animation: true
comment: 小礼物走一走,来 Github 关注我

reward:
wechatpay: /images/wechatpay.png
alipay: /images/alipay.png
#bitcoin: /images/bitcoin.png

问题

大小写导致404

打开 .deploy_git.git\config,将 ignorecase 属性的值由 true 改为 false,然后删除 .deploy_git 下所有文件,最后重新生成后部署:

1
2
hexo clean
hexo deploy -generate

Gitment

Error: Not Found

检查 github_user 和 github_repo 是否配置正确

Error: Validation Failed

打开 \themes\next\layout_third-party\comments\gitment.swig,找到以下代码,将id值由 window.location.pathname 改为 decodeURI(window.location.pathname)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
function renderGitment() {
var gitment = new {{ CommentsClass }}({
id: window.location.pathname,
owner: '{{ theme.gitment.github_user }}',
repo: '{{ theme.gitment.github_repo }}',
{% if theme.gitment.mint %}
lang: '{{ theme.gitment.language }}' || navigator.language || navigator.systemLanguage || navigator.userLanguage,
{% endif %}
oauth: {
{% if theme.gitment.mint and theme.gitment.redirect_protocol %}
redirect_protocol: '{{ theme.gitment.redirect_protocol }}',
{% endif %}
{% if theme.gitment.mint and theme.gitment.proxy_gateway %}
proxy_gateway: '{{ theme.gitment.proxy_gateway }}',
{% else %}
client_secret: '{{ theme.gitment.client_secret }}',
{% endif %}
client_id: '{{ theme.gitment.client_id }}'
}
});
gitment.render('gitment-container');
}

Welcome to Hexo! This is your very first post. Check documentation for more info. If you get any problems when using Hexo, you can find the answer in troubleshooting or you can ask me on GitHub.

Quick Start

Create a new post

1
$ hexo new "My New Post"

More info: Writing

Run server

1
$ hexo server

More info: Server

Generate static files

1
$ hexo generate

More info: Generating

Deploy to remote sites

1
$ hexo deploy

More info: Deployment

Quick Start

1
2
3
4
5
npm install hexo-cli -g
hexo init blog
cd blog
npm install
hexo server

概念

LMS,即 log-Structured Merge-Trees,其本质就是在读写之间取得平衡,和 B+ 树相比,它牺牲了部分读性能,用来大幅提高写性能。

原理

LSM 的原理是把一颗大树拆分成 N 棵小树,它首先写入到内存中(内存没有寻道速度的问题,随机写的性能得到大幅提升),在内存中构建一颗小树,随着小树越来越大,内存的小树会 flush 到磁盘上。当读时,由于不知道数据在哪颗小树上,因此必须遍历所有的小树,但在每颗小树内部数据是有序的。

优化

LSM Tree 弄了很多个小的有序结构,比如每 m 个数据,在内存里排序一次,下面 m 个数据,再排序一次……这样依次做下去,就可以获得 N/m 个有序的小的有序结构。
在查询的时候,因为不知道这个数据到底是在哪里,所以就从最新的一个小的有序结构里做二分查找,找得到就返回,找不到就继续找下一个小有序结构,一直到找到为止。
很容易可以看出,这样的模式,读取的时间复杂度是 (N/m) * log2N。读取效率是会下降的。
这就是最本来意义上的 LSM tree 的思路。那这样做,性能还是比较慢的,于是需要再做些事情来提升。

Bloom filter

Bloom filter 是一种随机数据结构,可以在 O(1) 时间内判断一个给定的元素是否在集合中。False positive 是可能的,但是 false negative 是不可能的。
使用 Bloom filter,可以快速的知道某一个小的有序结构里有没有指定的那个数据的。于是就可以不用二分查找,而只需简单的计算几次就能知道数据是否在某个小集合里。效率得到了提升,但付出的是空间代价。

Compaction

如果我们一直对 memtable 进行写入,memtable 就会一直增大知道超出服务器的内部限制。所以我们需要把 memtable 的内存数据放到 durable storage
上去,生成 SSTable 文件,这就做 minor compaction。

  • Minor compaction
    把 memtable 的内容写到一个 SSTable。目的是减少内存消耗,另外减少数据恢复时需要从日志读取的数据量。

  • Merge compaction
    把几个连续 level 的 SSTable 和 memtable 合并成一个 SSTable。目的是减少读操作要读取的 SSTable 数量。

  • Major compaction
    合并所有 level 上的 SSTable 的 merge compaction。目的在于彻底删除 tombstone 数据,并释放存储空间。

![LSM Compaction示例](/images/algorithm/LSM Compaction示例.jpg)

概念

Fork/Join 框架是 Java 7 提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

Fork 就是把一个大任务切分为多干个子任务并行的执行,Join 就是合并这些子任务的执行结果,最后得到这个大任务的结果。Fork/Join 的运行流程图如下所示:

分治任务模型图

实现原理

ForkJoinPool

ForkJoinPool 本质上也是一个生产者-消费者的实现,但是更加智能。ThreadPoolExecutor 内部只有一个任务队列,而 ForkJoinPool 内部有多个任务队列,当
我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提
交到一个任务队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队
列中。

工作窃取算法

ForkJoinPool 支持一种叫做“任务窃取”的机制,如果工作线程空闲了,那它可以“窃取”其他工作任务队列里的任务。如此一来,所有的工作线程都不会闲下来了。
ForkJoinPool 中的任务队列采用的是双端队列,工作线程正常获取任务和“窃取任务”分别是从任务队列不同的端消费,这样能避免很多不必要的数据竞争。

  • 优点:充分利用线程进行并行计算,减少了线程间的竞争
  • 缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗更多的系统资源,比如创建多个线程和多个双端队列。

使用

Fork/Join 是一个并行计算的框架,主要就是用来支持分治任务模型的,这个计算框架里的 Fork 对应的是分治任务模型里的任务分解,Join 对应的是结果合并。Fork/Join 计算框架主要包含两部分,一部分是分治任务的线程池 ForkJoinPool,另一部分是分治任务 ForkJoinTask。这两部分的关系类似于 ThreadPoolExecutor 和 Runnable 的关系,都可以理解为提交任务到线程池,只不过分治任务有自己独特类型 ForkJoinTask。
ForkJoinTask 是一个抽象类,它的方法有很多,最核心的是 fork() 方法和 join() 方法,其中 fork() 方法会异步地执行一个子任务,而 join() 方法则会阻塞当前线程来等待子任务的执行结果。ForkJoinTask 有两个子类:RecursiveAction 和 RecursiveTask,通过名字你就应该能知道,它们都是用递归的方式来处理分治任务的。这两个子类都定义了抽象方法 compute(),不过区别是 RecursiveAction 定义的 compute() 没有返回值,而 RecursiveTask 定义的 compute() 方法是有返回值的。这两个子类也是抽象类,在使用的时候,需要我们定义子类去扩展。

计算斐波那契数列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static void main(String[] args) {
// 创建分治任务线程池
ForkJoinPool fjp = new ForkJoinPool(4);
// 创建分治任务
Fibonacci fib = new Fibonacci(30);
//启动分治任务
Integer result = fjp.invoke(fib);
// 输出结果
System.out.println(result);
}

// 递归任务
static class Fibonacci extends RecursiveTask<Integer> {
final int n;

Fibonacci(int n) {
this.n = n;
}

protected Integer compute() {
if (n <= 1)
return n;
Fibonacci f1 = new Fibonacci(n - 1);
// 创建⼦任务
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
// 等待⼦任务结果,并合并结果
return f2.compute() + f1.join();
}
}

求和

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();

// 生成一个计算任务,负责计算 1 + 2 + 3 + 4
CountTask task = new CountTask(1, 4);
// 执行一个任务
Future<Integer> result = forkJoinPool.submit(task);

System.out.println(result.get());
}

class CountTask extends RecursiveTask<Integer> {
/**
* 阈值
*/
private static final int THRESHOLD = 2;
private int start;
private int end;

public CountTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
int sum = 0;

// 如果任务足够小就计算任务
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);

// 执行子任务
leftTask.fork();
rightTask.fork();

// 等待子任务执行完,并得到其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();

// 合并子任务
sum = leftResult + rightResult;
}

return sum;
}
}

Parallel Stream

Java 1.8 提供的 Stream API 里面并行流也是以 ForkJoinPool 为基础的。不过需要注意的是,默认情况下所有的并行流计算都共享一个 ForkJoinPool,这个共享的 ForkJoinPool 默认的线程数是 CPU 的核数;如果所有的并行流计算都是 CPU 密集型计算的话,完全没有问题,但是如果存在 I/O 密集型的并行流计算,那么很可能会因为一个很慢的 I/O 计算而拖慢整个系统的性能。所以建议用不同的 ForkJoinPool 执行不同类型的计算任务。

生命周期

Java 语言中线程共有六种状态,分别是:

  1. NEW(初始化状态)
  2. RUNNABLE(可运行 / 运行状态)
  3. BLOCKED(阻塞状态)
  4. WAITING(无时限等待)
  5. TIMED_WAITING(有时限等待)
  6. TERMINATED(终止状态)
    这看上去挺复杂的,状态类型也比较多。但其实在操作系统层面,Java 线程中的 BLOCKED、WAITING、TIMED_WAITING 是一种状态,即休眠状态。也就是说只要 Java 线程处于这三种状态之一,那么这个线程就永远没有 CPU 的使用权。所以 Java 线程的生命周期可以简化为下图:
    线程状态转换图

RUNNABLE 与 BLOCKED 的状态转换

只有一种场景会触发这种转换,就是线程等待 synchronized 的隐式锁。synchronized 修饰的方法、代码块同一时刻只允许一个线程执行,其他线程只能等待,这种情况下,等待的线程就会从 RUNNABLE 转换到 BLOCKED 状态。而当等待的线程获得 synchronized 隐式锁时,就又会从 BLOCKED 转换到 RUNNABLE 状态。

RUNNABLE 与 WAITING 的状态转换

总体来说,有三种场景会触发这种转换:

  1. 获得 synchronized 隐式锁的线程,调用无参数的 Object.wait() 方法
  2. 调用无参数的 Thread.join() 方法。其中的 join() 是一种线程同步方法
  3. 调用 LockSupport.park() 方法

RUNNABLE 与 TIMED_WAITING 的状态转换

有五种场景会触发这种转换:

  1. 调用带超时参数的 Thread.sleep(long millis) 方法
  2. 获得 synchronized 隐式锁的线程,调用带超时参数的 Object.wait(long timeout) 方法
  3. 调用带超时参数的 Thread.join(long millis) 方法
  4. 调用带超时参数的 LockSupport.parkNanos(Object blocker, long deadline) 方法
  5. 调用带超时参数的 LockSupport.parkUntil(long deadline) 方法

从 NEW 到 RUNNABLE 状态

Java 刚创建出来的 Thread 对象就是 NEW 状态,NEW 状态的线程,不会被操作系统调度,因此不会执行。Java 线程要执行,就必须转换到 RUNNABLE 状态。从 NEW 状态转换到 RUNNABLE 状态很简单,只要调用线程对象的 start() 方法就可以了。

从 RUNNABLE 到 TERMINATED 状态

线程执行完 run() 方法后,会自动转换到 TERMINATED 状态,当然如果执行 run() 方法的时候异常抛出,也会导致线程终止。有时候我们需要强制中断 run() 方法的执行,在Java 的 Thread 类里面有个 stop() 方法,不过已经标记为 @Deprecated,所以不建议使用了。正确的姿势其实是调用 interrupt() 方法。

线程个数

对于 CPU 密集型的计算场景,理论上“线程的数量 =CPU 核数”就是最合适的。不过在工程上,线程的数量一般会设置为“CPU 核数 +1”,这样的话,当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证 CPU 的利用率。
对于 I/O 密集型计算场景,最佳的线程数是与程序中 CPU 计算和 I/O 操作的耗时比相关的,我们可以总结出这样一个公式:最佳线程数 =1 +(I/O 耗时 / CPU 耗时)

概念

管程封装了同步操作,对进程隐蔽了同步细节,简化了同步功能的调用界面。用户编写并发程序如同编写顺序(串行)程序。

引入目的:

  1. 把分散在各进程中的临界区集中起来进行管理
  2. 防止进程有意或无意的违法同步操作
  3. 便于用高级语言来书写程序,也便于程序正确性验证

MESA 模型

MESA 模型示意图如下:
MESA管程模型示意图

在管程模型里,共享变量和对共享变量的操作是被封装起来的,图中最外层的框就代表封装的意思。
框的上面只有一个入口,并且在入口旁边还有一个入口等待队列。
当多个线程同时试图进入管程内部时,只允许一个线程进入,其他线程则在入口等待队列中等待。这个过程类似就医流程的分诊,只允许一个患者就诊,其他患者都在门口等待。
管程里还引入了条件变量的概念,而且每个条件变量都对应有一个等待队列,如上图,条件变量 A 和条件变量 B 分别都有自己的等待队列。

编程范式

对于 MESA 管程来说,有一个编程范式,就是需要在一个 while 循环里面调用 wait()。这个是 MESA 管程特有的。

1
2
3
while(条件不满足) {
wait();
}

之所以需要放在 while 循环里,这是因为在MESA 管程里面,T2 通知完 T1 后,T2 还是会接着执行,T1 并不立即执行,仅仅是从条件变量的等待队列进到入口等待队列里面。这样做的好处是 notify() 不用放到代码的最后,T2 也没有多余的阻塞唤醒操作。但是也有个副作用,就是当 T1 再次执行的时候,可能曾经满足的条件,现在已经不满足了,所以需要以循环方式检验条件变量。

另外,除非经过深思熟虑,否则尽量使用 notifyAll(),原因如下:
notify() 是会随机地通知等待队列中的一个线程,而 notifyAll() 会通知等待队列中的所有线程。从感觉上来讲,应该是 notify() 更好一些,因为即便通知所有线程,也只有一个线程能够进入临界区。但那所谓的感觉往往都蕴藏着风险,实际上使用 notify() 也很有风险,它的风险在于可能导致某些线程永远不会被通知到。
假设我们有资源 A、B、C、D,线程 1 申请到了 AB,线程 2 申请到了 CD,此时线程 3 申请 AB,会进入等待队列(AB 分配给线程 1,线程 3 要求的条件不满足),线程 4 申请 CD 也会进入等待队列。我们再假设之后线程 1 归还了资源 AB,如果使用 notify() 来通知等待队列中的线程,有可能被通知的是线程 4,但线程 4 申请的是 CD,所以此时线程 4 还是会继续等待,而真正该唤醒的线程 3 就再也没有机会被唤醒了。

那什么时候可以使用 notify() 呢?需要满足以下三个条件:

  1. 所有等待线程拥有相同的等待条件
  2. 所有等待线程被唤醒后,执行相同的操作
  3. 只需要唤醒一个线程

概念

对于简单的原子性问题,还有一种无锁方案。Java SDK 并发包将这种无锁方案封装提炼之后,实现了一系列的原子类。

无锁方案相对互斥锁方案,最大的好处就是性能。互斥锁方案为了保证互斥性,需要执行加锁、解锁操作,而加锁、解锁操作本身就消耗性能;同时拿不到锁的线程还会进入阻塞状态,进而触发线程切换,线程切换对性能的消耗也很大。 相比之下,无锁方案则完全没有加锁、解锁的性能消耗,同时还能保证互斥性,既解决了问题,又没有带来新的问题,可谓绝佳方案。

实现原理

原子类性能高的秘密很简单,硬件支持而已。CPU 为了解决并发问题,提供了 CAS 指令(CAS,全称是 Compare And Swap,即“比较并交换”)。CAS 指令包含 3 个参数:共享变量的内存地址 A、用于比较的值 B 和共享变量的新值 C;并且只有当内存中地址 A 处的值等于 B 时,才能将内存中地址 A 处的值更新为新值 C。作为一条 CPU 指令,CAS 指令本身是能够保证原子性的。

使用 CAS 来解决并发问题,一般都会伴随着自旋,而所谓自旋,其实就是循环尝试。

在 CAS 方案中,有一个问题可能会常被你忽略,那就是 ABA 的问题。大多数情况下我们并不关心 ABA 问题,例如数值的原子递增,但也不能所有情况下都不关心,例如原子化的更新对象很可能就需要关心 ABA 问题,因为两个 A 虽然相等,但是第二个 A 的属性可能已经发生变化了。所以在使用 CAS 方案的时候,一定要先 check 一下。

概览

Java SDK 并发包里提供的原子类内容很丰富,我们可以将它们分为五个类别:

  • 原子化的基本数据类型
  • 原子化的对象引用类型
  • 原子化数组
  • 原子化对象属性更新器
  • 原子化的累加器

这五个类别提供的方法基本上是相似的,并且每个类别都有若干原子类,我们可以通过下面的原子类组成概览图来获得一个全局的印象。
原子类概览图

分类

原子化的基本数据类型

相关实现有 AtomicBoolean、AtomicInteger 和 AtomicLong,提供的方法主要有以下这些:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
getAndIncrement() // 原⼦化 i++
getAndDecrement() // 原⼦化的 i--
incrementAndGet() // 原⼦化的 ++i
decrementAndGet() // 原⼦化的 --i

// 当前值 +=delta,返回 += 前的值
getAndAdd(delta)
// 当前值 +=delta,返回 += 后的值
addAndGet(delta)

//CAS 操作,返回是否成功
compareAndSet(expect, update)

// 以下四个⽅法,新值可以通过传⼊ func 函数来计算
getAndUpdate(func)
updateAndGet(func)
getAndAccumulate(x,func)
accumulateAndGet(x,func)

原子化的对象引用类型

相关实现有 AtomicReference、 AtomicStampedReference 和 AtomicMarkableReference,利用它们可以实现对象引用的原子化更新。
AtomicReference 提供的方法和原子化的基本数据类型差不多。不过需要注意的是,对象引用的更新需要重点关注 ABA 问题,AtomicStampedReference 和 AtomicMarkableReference 这两个原子类可以解决 ABA 问题。

解决 ABA 问题的思路其实很简单,增加一个版本号维度就可以了,这个和乐观锁机制很类似,每次执行 CAS 操作,附加再更新一个版本号,只要保证版本号是递增的,那么即便 A 变成 B 之后再变回 A,版本号也不会变回来(版本号递增的)。

AtomicStampedReference 实现的 CAS 方法就增加了版本号参数,方法签名如下:

1
2
3
4
5
boolean compareAndSet(
V expectedReference,
V newReference,
int expectedStamp,
int newStamp)

AtomicMarkableReference 的实现机制则更简单,将版本号简化成了一个 Boolean 值,方法签名如下:

1
2
3
4
5
boolean compareAndSet(
V expectedReference,
V newReference,
boolean expectedMark,
boolean newMark)

原子化数组

相关实现有 AtomicIntegerArray、AtomicLongArray 和 AtomicReferenceArray,利用这些原子类,我们可以原子化地更新数组里面的每一个元素。这些类提供的方法和原子化的基本数据类型的区别仅仅是:每个方法多了一个数组的索引参数。

原子化对象属性更新器

相关实现有 AtomicIntegerFieldUpdater、AtomicLongFieldUpdater 和 AtomicReferenceFieldUpdater,利用它们可以原子化地更新对象的属性,这三个方法都是利用反射机制实现的。

需要注意的是,对象属性必须是 volatile 类型的,只有这样才能保证可见性;如果对象属性不是 volatile 类型的,newUpdater() 方法会抛出 IllegalArgumentException 这个运行时异常。

原子化的累加器

DoubleAccumulator、DoubleAdder、LongAccumulator 和 LongAdder,这四个类仅仅用来执行累加操作,相比原子化的基本数据类型,速度更快,但是不支持 compareAndSet() 方法。如果我们仅仅需要累加操作,使用原子化的累加器性能会更好。

总结

无锁方案相对于互斥锁方案,优点非常多,首先性能好,其次是基本不会出现死锁问题(但可能出现饥饿和活锁问题,因为自旋会反复重试)。
Java 提供的原子类能够解决一些简单的原子性问题,但是所有原子类的方法都是针对一个共享变量的,如果我们需要解决多个变量的原子性问题,建议还是使用互斥锁方案。原子类虽好,但使用要慎之又慎。

概念

Worker Thread 设计模式有时也称为流水线设计模式,这种设计模式类似于工厂流水线。在 Worker Thread 模式中,工人线程 Worker thread 会逐个取回工作并进行处理,当所有工作全部完成后,工人线程会等待新的工作到来。

线程池

Java 线程池就是 Worker Thread 设计模式的一种运用。

Java 的线程池既能够避免无限制地创建线程导致 OOM,也能避免无限制地接收任务导致 OOM。强烈建议你用创建有界的队列来接收任务。
当请求量大于有界队列的容量时,就需要合理地拒绝请求。如何合理地拒绝呢?这需要我们结合具体的业务场景来制定,即便线程池默认的拒绝策略能够满足你的需求,也同样建议我们在创建线程池时,清晰地指明拒绝策略。
同时,为了便于调试和诊断问题,同样强烈建议你在实际工作中给线程赋予一个业务相关的名字。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ExecutorService es = new ThreadPoolExecutor(
50,
500,
60L,
TimeUnit.SECONDS,
// 注意要创建有界队列
new LinkedBlockingQueue<Runnable>(2000),
// 建议根据业务需求实现ThreadFactory
r -> {
return new Thread(r, "echo-"+ r.hashCode());
},
// 建议根据业务需求实现RejectedExecutionHandler
new ThreadPoolExecutor.CallerRunsPolicy()
);

避免线程死锁

使用线程池过程中,还要注意一种线程死锁的场景。如果提交到相同线程池的任务不是相互独立的,而是有依赖关系的,那么就有可能导致线程死锁。

用下面的示例代码来模拟该应用,如果我们执行下面的这段代码,会发现它永远执行不到最后一行。执行过程中没有任何异常,但是应用已经停止响应了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// L1、L2阶段共⽤的线程池
ExecutorService es = Executors.newFixedThreadPool(2);
// L1阶段的闭锁
CountDownLatch l1 = new CountDownLatch(2);

for (int i=0; i<2; i++) {
System.out.println("L1");
// 执⾏L1阶段任务
es.execute(() -> {
// L2阶段的闭锁
CountDownLatch l2 = new CountDownLatch(2);
// 执⾏L2阶段⼦任务
for (int j=0; j<2; j++) {
es.execute(() -> {
System.out.println("L2");
l2.countDown();
});
}
// 等待L2阶段任务执⾏完
l2.await();
l1.countDown();
});
}

// 等着L1阶段任务执⾏完
l1.await();
System.out.println("end");

由此可见,提交到相同线程池中的任务一定是相互独立的,否则就一定要慎重。

概念

为每一个消息的处理开辟一个线程使得消息能够以并发的方式进行处理,从而提高系统整体的吞吐量。

轻量级线程

由于Java 中的线程是一个重量级的对象,创建成本很高,一方面创建线程比较耗时,另一方面线程占用的内存也比较大,所以,为每个请求创建一个新的线程并不适合高并发场景。

业界还有另外一种方案,叫做轻量级线程。这个方案在 Java 领域知名度并不高,但是在其他编程语言里却叫得很响,例如 Go 语言、Lua 语言里的协程,本质上就是一种轻量级的线程。轻量级的线程,创建的成本很低,基本上和创建一个普通对象的成本相似;并且创建的速度和内存占用相比操作系统线程至少有一个数量级的提升,所以基于轻量级线程实现 Thread-Per-Message 模式就完全没有问题了。