概念
过一个容器来解决生产者和消费者的强耦合关系,生产者生成数据无需等待消费者索取,消费者无需直接索要数据。两者并不进行任何通讯,而是通过容器来进行操作作用:解耦、支持并发、支持忙闲不均。
生产者-消费者模式的核心是一个任务队列,生产者线程生产任务,并将任务添加到任务队列中,而消费者线程从任务队列中获取任务并执行。下面是生产者-消费者模式的一个示意图:
优点
实现方式
公共类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
|
public interface Resource<T> {
void add(T t) throws InterruptedException;
T remove() throws InterruptedException; }
public class ProducerThread extends Thread { private Resource<Integer> resource;
public ProducerThread(Resource<Integer> resource) { this.resource = resource; }
@Override public void run() { while (true) { try { Thread.sleep(1000);
resource.add(new Random().nextInt(100)); } catch (InterruptedException e) { e.printStackTrace(); } } } }
public class ConsumerThread extends Thread { private Resource<Integer> resource;
public ConsumerThread(Resource<Integer> resource) { this.resource = resource; }
@Override public void run() { while (true) { try { resource.remove();
Thread.sleep(3 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
public class Context {
@Test public void test() { Resource<Integer> resource = new Resource1<>(10);
ProducerThread producer = new ProducerThread(resource);
ConsumerThread consumer1 = new ConsumerThread(resource); ConsumerThread consumer2 = new ConsumerThread(resource); ConsumerThread consumer3 = new ConsumerThread(resource);
producer.start(); consumer1.start(); consumer2.start(); consumer3.start();
try { producer.join(); consumer1.join(); consumer2.join(); consumer3.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }
|
synchronized及wait和notify
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
public class Resource1<T> implements Resource<T> {
private Object[] items;
private int addIndex;
private int removeIndex;
private int count;
public Resource1(int size) { items = new Object[size]; }
@Override public synchronized void add(T t) throws InterruptedException { while (count == items.length) { wait(); } items[addIndex] = t; if (++addIndex == items.length) { addIndex = 0; } ++count; notifyAll();
System.out.println(String.format("生产者%s生产一个资源:%s", Thread.currentThread().getName(), t)); }
@Override public synchronized T remove() throws InterruptedException { while (count == 0) { wait(); } Object x = items[removeIndex]; if (++removeIndex == items.length) { removeIndex = 0; } --count; notifyAll();
System.out.println(String.format("生产者%s消费一个资源:%s", Thread.currentThread().getName(), x));
return (T) x; } }
|
Lock和Condition的await和signalAll
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| /** * 基于Lock和Condition的await和signalAll * * @author cdrcool */ public class Resource2<T> implements Resource<T> { /** * 数组元数 */ private Object[] items;
/** * 添加的下标 */ private int addIndex;
/** * 删除的下标 */ private int removeIndex;
/** * 数组当前数量 */ private int count;
/** * 可重入锁 */ private Lock lock = new ReentrantLock();
/** * 数组为空且要移除元素时,await;添加元素时,signal */ private Condition notEmpty = lock.newCondition();
/** * 数组已满且要添加元素时,await;移除元素时,signal */ private Condition notFull = lock.newCondition();
public Resource2(int size) { items = new Object[size]; }
@Override public void add(T t) throws InterruptedException { lock.lock(); try { while (count == items.length) { notFull.await(); } items[addIndex] = t; if (++addIndex == items.length) { addIndex = 0; } ++count; notEmpty.signal();
System.out.println(String.format("生产者%s生产一个资源:%s", Thread.currentThread().getName(), t)); } finally { lock.unlock(); } }
@Override public T remove() throws InterruptedException { lock.lock(); try { while (count == 0) { notEmpty.await(); } Object x = items[removeIndex]; if (++removeIndex == items.length) { removeIndex = 0; } --count; notFull.signal();
System.out.println(String.format("生产者%s消费一个资源:%s", Thread.currentThread().getName(), x));
//noinspection unchecked return (T) x; } finally { lock.unlock(); } } }
|
BlockingQueue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
public class Resource3<T> implements Resource<T> { private BlockingQueue<T> items;
public Resource3(int size) { items = new LinkedBlockingQueue<>(size); }
@Override public void add(T value) throws InterruptedException { items.put(value); System.out.println(String.format("生产者%s生产一个资源:%s,当前资源池有%s个资源", Thread.currentThread().getName(), value, items.size())); }
@Override public T remove() throws InterruptedException { T value = items.take();
System.out.println(String.format("消费者%s消耗一个资源:%s,当前资源池有%s个资源", Thread.currentThread().getName(), value, items.size()));
return value; } }
|
应用示例
执行批量任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| BlockingQueue<Task> bq = new LinkedBlockingQueue<>(2000);
void start() { ExecutorService es = Executors.newFixedThreadPool(5); for (int i=0; i<5; i++) { es.execute(()->{ try { while (true) { List<Task> ts = pollTasks(); execTasks(ts); } } catch (Exception e) { e.printStackTrace(); } }); } }
List<Task> pollTasks() throws InterruptedException { List<Task> ts = new LinkedList<>(); Task t = bq.take(); while (t != null) { ts.add(t); t = bq.poll(); } return ts; }
execTasks(List<Task> ts) { }
|
需要注意的是,从任务队列中获取批量任务的方法 pollTasks() 中,首先是以阻塞方式获取任务队列中的一条任务,而后则是以非阻塞的方式获取任务;之所以首先采用阻塞方式,是因为如果任务队列中没有任务,这样的方式能够避免无谓的循环。
分阶段提交
利用生产者-消费者模式还可以轻松地支持一种分阶段提交的应用场景。我们知道写文件如果同步刷盘性能会很慢,所以对于不是很重要的数据,我们往往采用异步刷盘的方式。考虑实现以下需求:
- ERROR级别的日志需要立即刷盘;
- 数据积累到500条需要立即刷盘;
- 存在未刷盘数据,且5秒钟内未曾刷盘,需要立即刷盘。
这个日志组件的异步刷盘操作本质上其实就是一种分阶段提交。下面是示例代码:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| class Logger { final BlockingQueue<LogMsg> bq = new BlockingQueue<>(); static final int batchSize = 500; ExecutorService es = Executors.newFixedThreadPool(1); void start(){ File file = File.createTempFile("foo", ".log"); final FileWriter writer = new FileWriter(file); this.es.execute(()->{ try { int curIdx = 0; long preFT = System.currentTimeMillis(); while (true) { LogMsg log = bq.poll(5, TimeUnit.SECONDS); if (log != null) { writer.write(log.toString()); ++curIdx; } if (curIdx <= 0) { continue; } if (log!=null && log.level==LEVEL.ERROR || curIdx == batchSize || System.currentTimeMillis() - preFT > 5000) { writer.flush(); curIdx = 0; preFT = System.currentTimeMillis(); } } } catch(Exception e) { e.printStackTrace(); } finally { try { writer.flush(); writer.close(); } catch(IOException e) { e.printStackTrace(); } } }); } void info(String msg) { bq.put(new LogMsg(LEVEL.INFO, msg)); } void error(String msg) { bq.put(new LogMsg(LEVEL.ERROR, msg)); } }
enum LEVEL { INFO, ERROR }
class LogMsg { LEVEL level; String msg; LogMsg(LEVEL lvl, String msg){} String toString(){} }
|
总结
Java 语言提供的线程池本身就是一种生产者-消费者模式的实现,但是线程池中的线程每次只能从任务队列中消费一个任务来执行,对于大部分并发场景这种策略都没有问题。但是有些场景还是需要自己来实现,例如需要批量执行以及分阶段提交的场景。
生产者-消费者模式在分布式计算中的应用也非常广泛。在分布式场景下,我们可以借助分布式消息队列(MQ)来实现生产者-消费者模式。MQ一般都会支持两种消息模型,一种是点对点模型,一种是发布订阅模型。这两种模型的区别在于,点对点模型里一个消息只会被一个消费者消费,和 Java 的线程池非常类似(Java 线程池的任务也只会被一个线程执行);而发布订阅模型里一个消息会被多个消费者消费,本质上是一种消息的广播,在多线程编程领域,我们可以结合观察者模式实现广播功能。