Java内置条件队列应用,实现经典的生产者消费者算法

背景“生产者和消费者模型” 是多线程通信的典型案例,本章节将利用前一节的锁和条件队列的知识,来实现一个完整的有界缓冲区,并创建多个线程访问该有界缓冲区,模拟生产者提供数据、消费者处理数据的过程,正文如下 。
生产者消费者模型生产者和消费者模型中,因为多个线程共享同一个缓冲区,所以就涉及到两个重要的通信约束:

  1. 缓冲区满的时候,生产者不能再添加数据,应该阻塞等待,直到缓冲区有空位;
  2. 缓冲区空的时候,消费者不能再获取数据,应该阻塞等待,直到有新的数据加入缓冲区 。
要保证上述约束条件,可以用 sleep 空循环,也可以使用锁和条件队列 。利用锁和条件队列实现的思路是,生产者和消费者有各自要等待的条件,一旦条件不满足,就阻塞在该条件队列上,直到另一个线程唤醒自己 。
实现过程缓冲区的 “满” 和 “空” 是两个条件,如果用内置锁,对缓冲区的操作由同一把锁保护,只能共用一个条件队列;如果使用显式锁,则可以定义两个条件队列 。
这里我们就用内置锁和内置条件队列来实现一个通信模型中的共享缓冲区类 。设计类图结构:
Java内置条件队列应用,实现经典的生产者消费者算法

文章插图
 
抽象有界限缓存首先,创建一抽象有界缓存类 ABoundedBuffer,提供插入和删除的基本实现 。
/** * @title:ABoundedBuffer * @description :有界缓存抽象类 * @update:2019-12-20 上午9:29:33 * @author:172.17.5.73 * @version:1.0.0 * @since:2019-12-20 */public abstract class ABoundedBuffer<V> { private final V[] buf; private int tail; private int head; private int count;protected ABoundedBuffer(int capacity){this.buf = (V[]) new Object[capacity]; }protected synchronized final void doPut(V v){buf[tail] = v;if(++tail==buf.length){tail = 0;}++count; } protected synchronized final V doTake(){V v = buf[head];buf[head] = null;if(++head==buf.length){head = 0;}--count;return v; }public synchronized final boolean isFull(){return count == buf.length; }public synchronized final boolean isEmpty(){return count==0; }}定义实现类其次,利用内置条件队列,编写子类实现可阻塞的插入和删除操作 。
插入操作,依赖的条件是缓存非满,当条件不满足时,调用 wait 方法挂起线程,一旦插入成功,说明缓存非空,则调用 notifyAll 方法唤醒等待非空的线程 。
删除操作,依赖的条件是非空,当条件不满足时,同样挂起等待,一旦删除成功,说明缓存非满,唤起等待该条件的线程 。
完整的源码为:
import JAVA.util.Date;/** ** @title:InnerConditionQueue * @description :使用内置条件队列,实现简单的有界缓存 *通过对象的 wait 和 notify 来实现挂起 *锁对象是 this,调用 wait/notify 的对象是同一个对象 。*三元关系(锁、wait/notify、条件谓词) *缺陷: *线程从 wait 中被唤醒时,并不代码条件谓词为真,此时还是需要再判断条件 。所以必须在循环中调用wait *每次醒来时都判断谓词的真假 。*谓词:对客体的描述或说明(是什么、怎么样、做什么),描述客体的本质、关系、特性等的词项 。* @update:2019-12-20 下午4:18:06 * @author:172.17.5.73 * @version:1.0.0 * @since:2019-12-20 */public class InnerConditionQueue<V> extends ABoundedBuffer<V> { protected InnerConditionQueue(int capacity) {super(capacity); } public synchronized void put(V v) throws InterruptedException{while(isFull()){System.out.println(new Date()+" buffer is Full thread wait:"+Thread.currentThread().getName());wait();}doPut(v);notifyAll(); }public synchronized V take() throws InterruptedException{while(isEmpty()){System.out.println(new Date()+" buffer is empty thread wait:"+Thread.currentThread().getName());wait();}V v = doTake();//每当在等待一个条件时,一定要确保在条件谓词变为真时,通过某种方式发出通知notifyAll();System.out.println(new Date()+" "+Thread.currentThread().getName()+" take:"+v);return v; }}测试类最后,编写测试代码,创建一个大小为 2 的缓冲区对象,同时启动三个线程执行插入操作,主线程执行四次消费操作 。测试代码如下:
import java.util.Date;public class Main { public static void main(String[] args) {final InnerConditionQueue<String> bu = new InnerConditionQueue<String>(2);Thread t1 = new Thread(new Runnable(){@Overridepublic void run() {try {bu.put("hello1");} catch (InterruptedException execption) {System.out.println("intercetp1:"+Thread.currentThread().getName());}}});Thread t2 = new Thread(new Runnable(){@Overridepublic void run() {try {bu.put("hello2");} catch (InterruptedException execption) {System.out.println("intercetp2:"+Thread.currentThread().getName());}}});Thread t3 =new Thread(new Runnable(){@Overridepublic void run() {try {bu.put("hello3");Thread.sleep(50000);bu.put("last one...");} catch (InterruptedException execption) {System.out.println("intercetp3:"+Thread.currentThread().getName());}}});t1.start();t2.start();t3.start();try {Thread.sleep(5000);bu.take();bu.take();bu.take();bu.take();} catch (InterruptedException execption) {execption.printStackTrace();}System.out.println(new Date()+" main over..."); }}


推荐阅读