Android多线程编程之详解:阻塞队列+线程池


Android多线程编程之详解:阻塞队列+线程池

文章插图
 
一、阻塞队列简介阻塞队列常用于生产者和消费者场景,生产者往往是往队列里添加元素的线程,消费者
是从队列里拿元素的线程吗,阻塞队列就是生产者存放元素的容器,是消费者拿元素的容器
1.常见阻塞场景
当前队列中没有数据的情况下,消费端的所有线程都会被自动阻塞(挂起),直到有数据放入队列
当队列种数据填充满的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中
有空的位置,线程被自动唤醒
2.BlockingQueue
2.1.放入数据:
  • offer(anobject):表示如果可以将anobject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false(本方法不阻塞当前执行方法的线程)
  • offer(e o,long timeout,TimeUnit unit):可以设定等待的时间,如果在指定的时间还不能往队列中加入blockQuene,则返回失败
  • put(anobject):将anobject加到BlockingQueue里,如果BlockQueue没有控件,则调用此方法的线程被阻塞直到BlockingQueue里面有空间再继续
2.2.获取数据:
  • poll(long timeout,TimeUnit unit):从BlockQueue中取出一个队首的对象,如果在指定时间内队列一旦有时间可以取,则立即返回队列中的数据,否则直到时间超过还没有数据可取,返回失败
  • take():取走BlockingQueue排在位首的对象,若BlockingQueue为空,则阻塞进入等待状态,直到BlockingQueue有新的数据加入
  • drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数)通过该方法,可以提升获取数据的效率,无需分多次分批加锁或释放锁 。
 
二、JAVA中的阻塞队列1.ArrayBlockingQueue:由数组结构组成的有界阻塞队列
他是用数组实现的有界阻塞队列,并按照先进先出的原则对元素进行排序,默认情况下不保证线程公平的访问队列,公平的访问队列就是指阻塞的所有生产者线程或消费者线程当队列不可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者先生产,先阻塞的消费者线程可以先从队列里获取元素,通常情况下为了保证公平性会降低吞吐量 。
2.LinkedBlockingQueue:由链表结构组成的有限阻塞队列
他是基于链表的阻塞队列,同ArrayListBlockingQueue类似,此队列按照先进先出的原则对元素进行排序,其内部也会维持着一个数据缓冲队列,当生产者往队列中放入一个数据时,队列会从生产者手中获取数据并缓存在队列内部,而生产者立即返回,只有当队列缓冲区达到缓存容量的最大值时(可以指定该值),才会阻塞生产者线程,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之,对于消费者这段的处理也基于同样的原理,而LinkedBlockingQueue之所以能够高效的处理处理并发数据,还因为其对于生产者端和消费者端分别采用独立的锁来控制数据同步 。
以上两个常用的阻塞队列,还有五种不再详细介绍 。
下面分析ArrayBlockingQueue的源代码:
private static final long serialVersionUID = -817911632652898426L; final Object[] items;//阻塞队列维护的一个object类型的数组 int takeIndex;//队首元素 int putIndex;//队尾元素 int count;//队列中的元素 final ReentrantLock lock;//重入锁 private final Condition notEmpty;//条件对象判断数组不是满的 private final Condition notFull;//条件对象判断数组不是空的 transient Itrs itrs; final int dec(int i) { return ((i == 0) ? items.length : i) - 1; } /** * Returns item at index i. */ @SuppressWarnings("unchecked") final E itemAt(int i) { return (E) items[i]; } /** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } /** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */ private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; } //取元素 public void put(E e) throws InterruptedException { Objects.requireNonNull(e); final ReentrantLock lock = this.lock;//锁 lock.lockInterruptibly(); try { while (count == items.length) notFull.await();//阻塞线程,等待notFull.signalAll()唤醒 enqueue(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await();//阻塞线程,等待notEmpty.await()唤醒 return dequeue(); } finally { lock.unlock(); } }


推荐阅读