人的知识就好比一个圆圈,圆圈里面是已知的,圆圈外面是未知的。你知道得越多,圆圈也就越大,你不知道的也就越多。

0%

并发设计模式--生产者-消费者模式

概念

过一个容器来解决生产者和消费者的强耦合关系,生产者生成数据无需等待消费者索取,消费者无需直接索要数据。两者并不进行任何通讯,而是通过容器来进行操作作用:解耦、支持并发、支持忙闲不均。

生产者-消费者模式的核心是一个任务队列,生产者线程生产任务,并将任务添加到任务队列中,而消费者线程从任务队列中获取任务并执行。下面是生产者-消费者模式的一个示意图:

生产者-消费者模式示意图

优点

  • 解耦
  • 支持异步
  • 平衡生产者和消费者的速度差异

实现方式

公共类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/**
* 资源接口
*
* @author cdrcool
*/
public interface Resource<T> {

/**
* 添加资源
*/
void add(T t) throws InterruptedException;

/**
* 获取资源
*/
T remove() throws InterruptedException;
}

/**
* 生产者线程
*
* @author cdrcool
*/
public class ProducerThread extends Thread {
private Resource<Integer> resource;

public ProducerThread(Resource<Integer> resource) {
this.resource = resource;
}

@Override
public void run() {
while (true) {
try {
// 模拟耗时
Thread.sleep(1000);

resource.add(new Random().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

/**
* 消费者线程
*
* @author cdrcool
*/
public class ConsumerThread extends Thread {
private Resource<Integer> resource;

public ConsumerThread(Resource<Integer> resource) {
this.resource = resource;
}

@Override
public void run() {
while (true) {
try {
resource.remove();

// 模拟耗时
Thread.sleep(3 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

/**
* 上下文
*
* @author cdrcool
*/
public class Context {

@Test
public void test() {
Resource<Integer> resource = new Resource1<>(10);

// 生产者
ProducerThread producer = new ProducerThread(resource);

// 多个消费者
ConsumerThread consumer1 = new ConsumerThread(resource);
ConsumerThread consumer2 = new ConsumerThread(resource);
ConsumerThread consumer3 = new ConsumerThread(resource);

producer.start();
consumer1.start();
consumer2.start();
consumer3.start();

try {
producer.join();
consumer1.join();
consumer2.join();
consumer3.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

synchronized及wait和notify

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* 基于synchronized及wait和notify
*
* @author cdrcool
*/
public class Resource1<T> implements Resource<T> {
/**
* 数组元数
*/
private Object[] items;

/**
* 添加的下标
*/
private int addIndex;

/**
* 删除的下标
*/
private int removeIndex;

/**
* 数组当前数量
*/
private int count;

public Resource1(int size) {
items = new Object[size];
}

@Override
public synchronized void add(T t) throws InterruptedException {
while (count == items.length) {
wait();
}
items[addIndex] = t;
if (++addIndex == items.length) {
addIndex = 0;
}
++count;
notifyAll();

System.out.println(String.format("生产者%s生产一个资源:%s", Thread.currentThread().getName(), t));
}

@Override
public synchronized T remove() throws InterruptedException {
while (count == 0) {
wait();
}
Object x = items[removeIndex];
if (++removeIndex == items.length) {
removeIndex = 0;
}
--count;
notifyAll();

System.out.println(String.format("生产者%s消费一个资源:%s", Thread.currentThread().getName(), x));

//noinspection unchecked
return (T) x;
}
}

Lock和Condition的await和signalAll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/**
* 基于Lock和Condition的await和signalAll
*
* @author cdrcool
*/
public class Resource2<T> implements Resource<T> {
/**
* 数组元数
*/
private Object[] items;

/**
* 添加的下标
*/
private int addIndex;

/**
* 删除的下标
*/
private int removeIndex;

/**
* 数组当前数量
*/
private int count;

/**
* 可重入锁
*/
private Lock lock = new ReentrantLock();

/**
* 数组为空且要移除元素时,await;添加元素时,signal
*/
private Condition notEmpty = lock.newCondition();

/**
* 数组已满且要添加元素时,await;移除元素时,signal
*/
private Condition notFull = lock.newCondition();

public Resource2(int size) {
items = new Object[size];
}

@Override
public void add(T t) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
notFull.await();
}
items[addIndex] = t;
if (++addIndex == items.length) {
addIndex = 0;
}
++count;
notEmpty.signal();

System.out.println(String.format("生产者%s生产一个资源:%s", Thread.currentThread().getName(), t));
} finally {
lock.unlock();
}
}

@Override
public T remove() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await();
}
Object x = items[removeIndex];
if (++removeIndex == items.length) {
removeIndex = 0;
}
--count;
notFull.signal();

System.out.println(String.format("生产者%s消费一个资源:%s", Thread.currentThread().getName(), x));

//noinspection unchecked
return (T) x;
} finally {
lock.unlock();
}
}
}

BlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 基于BlockingQueue
*
* @author cdrcool
*/
public class Resource3<T> implements Resource<T> {
private BlockingQueue<T> items;

public Resource3(int size) {
items = new LinkedBlockingQueue<>(size);
}

@Override
public void add(T value) throws InterruptedException {
items.put(value);
System.out.println(String.format("生产者%s生产一个资源:%s,当前资源池有%s个资源",
Thread.currentThread().getName(), value, items.size()));
}

@Override
public T remove() throws InterruptedException {
T value = items.take();

System.out.println(String.format("消费者%s消耗一个资源:%s,当前资源池有%s个资源",
Thread.currentThread().getName(), value, items.size()));

return value;
}
}

应用示例

执行批量任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 任务队列
BlockingQueue<Task> bq = new LinkedBlockingQueue<>(2000);

// 启动5个消费者线程执⾏批量任务
void start() {
ExecutorService es = Executors.newFixedThreadPool(5);
for (int i=0; i<5; i++) {
es.execute(()->{
try {
while (true) {
// 获取批量任务
List<Task> ts = pollTasks();
// 执⾏批量任务
execTasks(ts);
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
}

// 从任务队列中获取批量任务
List<Task> pollTasks() throws InterruptedException {
List<Task> ts = new LinkedList<>();
// 阻塞式获取⼀条任务
Task t = bq.take();
while (t != null) {
ts.add(t);
// ⾮阻塞式获取⼀条任务
t = bq.poll();
}
return ts;
}

// 批量执⾏任务
execTasks(List<Task> ts) {
// 省略具体代码⽆数
}

需要注意的是,从任务队列中获取批量任务的方法 pollTasks() 中,首先是以阻塞方式获取任务队列中的一条任务,而后则是以非阻塞的方式获取任务;之所以首先采用阻塞方式,是因为如果任务队列中没有任务,这样的方式能够避免无谓的循环。

分阶段提交

利用生产者-消费者模式还可以轻松地支持一种分阶段提交的应用场景。我们知道写文件如果同步刷盘性能会很慢,所以对于不是很重要的数据,我们往往采用异步刷盘的方式。考虑实现以下需求:

  1. ERROR级别的日志需要立即刷盘;
  2. 数据积累到500条需要立即刷盘;
  3. 存在未刷盘数据,且5秒钟内未曾刷盘,需要立即刷盘。
    这个日志组件的异步刷盘操作本质上其实就是一种分阶段提交。下面是示例代码:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    class Logger {
    // 任务队列
    final BlockingQueue<LogMsg> bq = new BlockingQueue<>();
    // flush批量
    static final int batchSize = 500;
    // 只需要⼀个线程写⽇志
    ExecutorService es = Executors.newFixedThreadPool(1);

    //启动写⽇志线程
    void start(){
    File file = File.createTempFile("foo", ".log");
    final FileWriter writer = new FileWriter(file);
    this.es.execute(()->{
    try {
    // 未刷盘⽇志数量
    int curIdx = 0;
    long preFT = System.currentTimeMillis();
    while (true) {
    LogMsg log = bq.poll(5, TimeUnit.SECONDS);
    // 写⽇志
    if (log != null) {
    writer.write(log.toString());
    ++curIdx;
    }

    //如果不存在未刷盘数据,则⽆需刷盘
    if (curIdx <= 0) {
    continue;
    }

    // 根据规则刷盘
    if (log!=null && log.level==LEVEL.ERROR || curIdx == batchSize || System.currentTimeMillis() - preFT > 5000) {
    writer.flush();
    curIdx = 0;
    preFT = System.currentTimeMillis();
    }
    }
    } catch(Exception e) {
    e.printStackTrace();
    } finally {
    try {
    writer.flush();
    writer.close();
    } catch(IOException e) {
    e.printStackTrace();
    }
    }
    });
    }

    // 写INFO级别⽇志
    void info(String msg) {
    bq.put(new LogMsg(LEVEL.INFO, msg));
    }

    // 写ERROR级别⽇志
    void error(String msg) {
    bq.put(new LogMsg(LEVEL.ERROR, msg));
    }
    }

    // ⽇志级别
    enum LEVEL {
    INFO,
    ERROR
    }

    class LogMsg {
    LEVEL level;
    String msg;
    // 省略构造函数实现
    LogMsg(LEVEL lvl, String msg){}
    // 省略toString()实现
    String toString(){}
    }

总结

Java 语言提供的线程池本身就是一种生产者-消费者模式的实现,但是线程池中的线程每次只能从任务队列中消费一个任务来执行,对于大部分并发场景这种策略都没有问题。但是有些场景还是需要自己来实现,例如需要批量执行以及分阶段提交的场景。
生产者-消费者模式在分布式计算中的应用也非常广泛。在分布式场景下,我们可以借助分布式消息队列(MQ)来实现生产者-消费者模式。MQ一般都会支持两种消息模型,一种是点对点模型,一种是发布订阅模型。这两种模型的区别在于,点对点模型里一个消息只会被一个消费者消费,和 Java 的线程池非常类似(Java 线程池的任务也只会被一个线程执行);而发布订阅模型里一个消息会被多个消费者消费,本质上是一种消息的广播,在多线程编程领域,我们可以结合观察者模式实现广播功能。

小礼物走一走,来 Github 关注我