目录
一、引言
二、阻塞队列种类
三、阻塞队列使用
四、阻塞队列实现原理
五、总结
一、引言
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。
阻塞队列常用于生产者和消费者的场景,生产者线程可以把生产结果存到阻塞队列中,而消费者线程把中间结果取出并在将来修改它们。比如自助火锅传送带送菜,每当我们拿下一盘菜的时候,服务人员才能添加到上面,当传送带上没有东西的时候我们必须等着服务员放菜。
二、阻塞队列种类
ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
三、阻塞队列使用
每0.4秒中生产一辆车,每1秒中提取一辆车,所以车厂总是会有车
public class TestQueue {public static void main(String []args){final Object o=new Object();final BlockingQueue<String> queue=new ArrayBlockingQueue<String>(2);for(int i=0;i<2;i++){new Thread(new Runnable() {public void run() {while (true) {try {Thread.sleep(400);System.out.println(Thread.currentThread().getName() + "准备生产汽车");queue.put("qiche");} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + "生产汽车完毕,已有" + queue.size() + "辆汽车");}}}).start();}new Thread(new Runnable() {public void run() {while (true) {try {//Thread.sleep((int)Math.random()*10000000);Thread.sleep(1000);System.out.println(Thread.currentThread().getName() + "准备提走汽车");queue.take();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + "已经提走汽车,还有" + queue.size() + "辆汽车");}}}).start();}}
四、阻塞队列实现原理
阻塞队列实现方法使用的lock+condition高级方法来实现的,可以参考Java多线程(三)——多线程实现同步来了解lock与condition使用。
public class TestQueueyuanli {public static void main(String []args){final Object o=new Object();//final BlockingQueue<String> queue=new ArrayBlockingQueue<String>(2);final BoundedBuffer queue =new BoundedBuffer();for(int i=0;i<2;i++){new Thread(new Runnable() {public void run() {while (true) {try {//Thread.sleep((int)Math.random()*10000000);Thread.sleep(400);System.out.println(Thread.currentThread().getName() + "准备生产汽车");queue.put("qiche");} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + "生产汽车完毕,已有" + queue.size() + "辆汽车");}}}).start();}new Thread(new Runnable() {public void run() {while (true) {try {//Thread.sleep((int)Math.random()*10000000);Thread.sleep(1000);System.out.println(Thread.currentThread().getName() + "准备提走汽车");queue.take();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + "已经提走汽车,还有" + queue.size() + "辆汽车");}}}).start();}}class BoundedBuffer {final Lock lock = new ReentrantLock();final Condition notFull = lock.newCondition();//notFull代表取能否放数据final Condition notEmpty = lock.newCondition();//notEmpoty代表能否取数据final Object[] items = new Object[5];int putptr, takeptr, count;public void put(Object x) throws InterruptedException {lock.lock(); try {while (count == items.length)notFull.await();items[putptr] = x;if (++putptr == items.length) putptr = 0;++count;notEmpty.signal();} finally { lock.unlock(); }}public Object take() throws InterruptedException {lock.lock(); try {while (count == 0)notEmpty.await();Object x = items[takeptr];if (++takeptr == items.length) takeptr = 0;--count;notFull.signal();return x;} finally { lock.unlock(); }}public int size(){return count;}}