100字范文,内容丰富有趣,生活中的好帮手!
100字范文 > java多线程之阻塞队列--BlockingQueue

java多线程之阻塞队列--BlockingQueue

时间:2018-08-06 07:01:36

相关推荐

java多线程之阻塞队列--BlockingQueue

BlockingQueue阻塞队列的一些方法介绍:

BlockingQueue是一个接口,实现类如下

下面将介绍实现类的用法,和一些简单的列子

1).ArrayBlockingQueue:数组阻塞队列

数组阻塞队列是一个有边界,支出先进先出(FIFO first in first out)的队列,数组的头部元素是在数组中存在时间最长的元素,ArrayBlockingQueue中,生产者和消费者共同使用同一个锁,所有没有做到完全的并行操作,这一点与LinkedBlockingQueue不同,

2).LinkedBlockingQueue : 链表阻塞队列

LinkedBlockingQueue类似ArrayBlockQueue,是一个链表阻塞队列,但是生产者和消费者使用的是两个锁,所有在存储元素的同时也可以获取元素,LinkedBlockQueue 如果没有初始化大小的时候,当生产者存储元素的速度大于消费者消费元素的速度时,LinkedBlockingQueue会无限增长,在这种情况下,系统内存可能会消耗殆尽。

来看一个列子

package com.queue;import java.util.Random;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.TimeUnit;/*** 链表阻塞队列测试* @author Administrator**/public class MyLinkedBlockingQueue {public static void main(String[] args) throws InterruptedException {LinkedBlockingQueue<String> abq = new LinkedBlockingQueue<String>(10);//三个生产者Producer p1 = new Producer(abq);Producer p2 = new Producer(abq);Producer p3 = new Producer(abq);//一个消费者Consumer queue1 = new Consumer(abq);//生成一个缓存线程池ExecutorService pool = Executors.newCachedThreadPool();pool.execute(p1);pool.execute(p2);pool.execute(p3);pool.execute(queue1);//当前线程睡眠10秒,等待线程池中的线程执行TimeUnit.SECONDS.sleep(10);//调用方法停止线程p1.myStop();p2.myStop();p3.myStop();TimeUnit.SECONDS.sleep(20);//启动一次顺序关闭,执行以前提交的任务,但不接受新任务pool.shutdown();}}/*** 消费者* @author lijh**/class Consumer implements Runnable{//注入队列private LinkedBlockingQueue<String> abq;private Boolean flag = true;public Consumer(LinkedBlockingQueue<String> abq){this.abq = abq;}@Overridepublic void run() {System.out.println("启动消费者线程");while(flag){try {// 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。String date = abq.poll(2, TimeUnit.SECONDS);if(date != null){System.out.println("消费者得到数据 "+date);//休眠一秒TimeUnit.SECONDS.sleep(1);}else{//没有获取到数据,则退出循环flag = false;System.out.println("没有可获取的数据");}} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}}}}/*** 生产者* @author lijh**/class Producer implements Runnable{//注入队列private LinkedBlockingQueue<String> abq;//线程执行标准为boolean flag = true;public Producer(LinkedBlockingQueue<String> abq){this.abq = abq;}public void myStop() {flag = false;}@Overridepublic void run() {Random r = new Random();System.out.println("启动生产者线程");while(flag){System.out.println("开始生产数据"+ Thread.currentThread().getName());try {TimeUnit.SECONDS.sleep(1);// 将指定元素插入此队列中,在到达指定的等待时间前等待可用的空间(如果有必要)。if(!abq.offer("date "+r.nextInt(), 2, TimeUnit.SECONDS)){System.out.println("放入数据失败。。。");myStop();}else{System.out.println("数据生产成功"+ Thread.currentThread().getName());}} catch (InterruptedException e) {e.printStackTrace();//如果出现异常,则终止线程Thread.currentThread().interrupt();}}}}

部分执行结果

当队列放满时,再次调用abq.offer()往队列中存放元素,会返回false,然后我们调用mystop()方法终止线程,当队列元素为空时,再次调用abq.poll()方法得到元素,会返回null,我们手动终止线程。上面的列子我们可以吧LinkedBlockingQueue换成ArrayBlockingQueue也是可以执行的。

3).PriorityBlockingQueue优先级阻塞队列

存储在PriorityBlockingQueue队列中的元素必须实现Comparable接口来进行优先级排序,PriorityBlockingQueue是一个无界的队列,所有当生产者速度快于消费者时,会使内存消耗殆尽,需要注意,当队列中没有元素时,获取元素时会产生阻塞,PriorityBlockingQueue的生产者和消费者使用同一把锁,和ArrayBlckingQueue一样

package com.queue;import java.util.Random;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.PriorityBlockingQueue;import java.util.concurrent.TimeUnit;/*** 优先级阻塞队列,存储元素时不会阻塞,因为是一个无界队列,获取元素时,当队列为空时,会阻塞* @author lijh**/public class MyPriorityBlockingQueue {public static void main(String[] args) throws InterruptedException {PriorityBlockingQueue<User> pbq = new PriorityBlockingQueue<User>();Producer2 p1 = new Producer2(pbq);Producer2 p2 = new Producer2(pbq);Consumer2 c1 = new Consumer2(pbq);ExecutorService pool = Executors.newFixedThreadPool(3);pool.execute(p1);pool.execute(p2);pool.execute(c1);//main线程睡眠10秒,等待线程池中的线程执行TimeUnit.SECONDS.sleep(10);TimeUnit.SECONDS.sleep(10);//启动一次顺序关闭,执行以前提交的任务,但不接受新任务pool.shutdown();}}class User implements Comparable<User>{//优先级private Integer priority;//用户名private String username;public User(Integer priority,String username){this.priority = priority;this.username = username;}//优先级排序方法@Overridepublic int compareTo(User o) {return pareTo(o.getPriority());}public Integer getPriority() {return priority;}public void setPriority(Integer priority) {this.priority = priority;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}}/*** 生产者* @author lijh**/class Producer2 implements Runnable{boolean flag=true;private PriorityBlockingQueue<User> pbq;public Producer2(PriorityBlockingQueue<User> pbq){this.pbq = pbq;}@Overridepublic void run() {Random r = new Random();int i=1;while(flag){pbq.put(new User(r.nextInt(20),"李雷"+i));System.out.println("生产数据成功");i++;if(i>5){//大于5的时候终止,两个生产者线程,一共会产生10个元素flag=false;}}}}/*** 消费者* @author lijh**/class Consumer2 implements Runnable{boolean flag = true;private PriorityBlockingQueue<User> pbq;public Consumer2(PriorityBlockingQueue<User> pbq){this.pbq = pbq;}@Overridepublic void run() {while(flag){try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}User user = pbq.poll();if(user == null){System.out.println("没有元素可以获取");flag = false;}else{System.out.println(user.getUsername()+"的优先级为"+user.getPriority());}}}}

4).延时队列DelayQueue

延时队列是一个支持延时获取元素的使用优先级队列实现的无界队列,优先级需要实现Comparable接口,延时需要实现Delayed接口,而Delayed接口实现了Comparable接口,所以我们只需要实现Delayed接口就可以了。

延时队列使用场景一般是定时任务和缓存系统设计

package com.queue;import java.util.Iterator;import java.util.Random;import java.util.concurrent.CountDownLatch;import java.util.concurrent.DelayQueue;import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;/*** 延时队列,队列中的元素需要实现Comparable和Delayed,存储元素时,可以设置在队列中存放的时间,* DelayQueue一般适用于定时任务和缓存系统设计* * 场景:考试时间为120分钟,30分钟后才可交卷,当时间到了,或学生都交完卷了考试结束。*这个场景中几个点需要注意:*考试时间为120分钟,30分钟后才可交卷,初始化考生完成试卷时间最小应为30分钟*对于能够在120分钟内交卷的考生,如何实现这些考生交卷*对于120分钟内没有完成考试的考生,在120分钟考试时间到后需要让他们强制交卷*在所有的考生都交完卷后,需要将控制线程关闭* @author lijh**/public class MyDelayQueue2 {public static void main(String[] args) throws InterruptedException {int studentNumber = 20;CountDownLatch countDownLatch = new CountDownLatch(studentNumber+1);DelayQueue<Student> students = new DelayQueue<Student>();Random random = new Random();for (int i = 0; i < studentNumber; i++) {students.put(new Student("student"+(i+1), 30+random.nextInt(120),countDownLatch));}Thread teacherThread =new Thread(new Teacher(students)); //初始化一个120分钟的Student对象,当超过120分钟,则会终止Teacher线程,强制交卷,//所以当Student对象的考试时间超过120分钟时,会强制交卷students.put(new EndExam(students, 120,countDownLatch,teacherThread));teacherThread.start();countDownLatch.await();System.out.println(" 考试时间到,全部交卷!"); }}class Student implements Runnable,Delayed{private String name;private long workTime;//private long submitTime;//延迟时间private boolean isForce = false;private CountDownLatch countDownLatch;public Student(){}public Student(String name,long workTime,CountDownLatch countDownLatch){this.name = name;this.workTime = workTime;this.submitTime = TimeUnit.NANOSECONDS.convert(workTime, TimeUnit.NANOSECONDS)+System.nanoTime();this.countDownLatch = countDownLatch;}/*** 根据学生考试时间排序* 前面初始化了一个考试时间为120分钟的学生,当延迟队列取到这个学生时,结束Teacher线程,强制交卷*/@Overridepublic int compareTo(Delayed o) {if(o == null || ! (o instanceof Student)) return 1;if(o == this) return 0; Student s = (Student)o;if (this.workTime > s.workTime) {return 1;}else if (this.workTime == s.workTime) {return 0;}else {return -1;}}/*** 返回剩余延迟时间,当剩余延迟时间为0或者负数时,代表延迟时间用尽了。*/@Overridepublic long getDelay(TimeUnit unit) {//System.out.println(submitTime);return unit.convert(submitTime - System.nanoTime(), TimeUnit.NANOSECONDS);}@Overridepublic void run() {if (isForce) {System.out.println(name + " 交卷, 希望用时" + workTime + "分钟"+" ,实际用时 120分钟" );}else {System.out.println(name + " 交卷, 希望用时" + workTime + "分钟"+" ,实际用时 "+workTime +" 分钟"); }countDownLatch.countDown();}public boolean isForce() {return isForce;}public void setForce(boolean isForce) {this.isForce = isForce;}}/*** 此线程会结束Teacher线程,模拟强制交卷* @author lijh**/class EndExam extends Student{private DelayQueue<Student> students;private CountDownLatch countDownLatch;private Thread teacherThread;public EndExam(DelayQueue<Student> students, long workTime, CountDownLatch countDownLatch,Thread teacherThread) {super("强制收卷", workTime,countDownLatch);this.students = students;this.countDownLatch = countDownLatch;this.teacherThread = teacherThread;}@Overridepublic void run() {teacherThread.interrupt();Student tmpStudent;for (Iterator<Student> iterator2 = students.iterator(); iterator2.hasNext();) {tmpStudent = iterator2.next();tmpStudent.setForce(true);//调用Student线程的run方法,打印考试时间tmpStudent.run();}countDownLatch.countDown();}}/*** 监督学生考试线程* @author lijh**/class Teacher implements Runnable{private DelayQueue<Student> students;public Teacher(DelayQueue<Student> students){this.students = students;}@Overridepublic void run() {try {System.out.println(" test start");while(!Thread.interrupted()){students.take().run();}} catch (Exception e) {e.printStackTrace();}}}

5).同步队列 synchronousQueue

synchronousQueue是一个不存储元素的阻塞队列,每一个poll()方法必须等待一个take()操作,否则不能继续poll(),适用于传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步

package com.queue;import java.util.Random;import java.util.UUID;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.SynchronousQueue;/*** 同步阻塞队列不会存储元素,每一个poll()方法必须等待一个take()操作,否则不能继续poll()* @author lijh**/public class MySynchronousQueue {public static void main(String[] args) {final SynchronousQueue<String> synchronousQueue = new SynchronousQueue<String>();Producer3 p1 = new Producer3(synchronousQueue);Consumer3 p2 = new Consumer3(synchronousQueue);Consumer3 p3 = new Consumer3(synchronousQueue);//生成一个定长线程池ExecutorService pool = Executors.newFixedThreadPool(3);pool.execute(p1);pool.execute(p2);pool.execute(p3);}}/*** 消费者* @author lijh**/class Consumer3 implements Runnable {protected SynchronousQueue<String> blockingQueue;public Consumer3(SynchronousQueue<String> queue) {this.blockingQueue = queue;}@Overridepublic void run() {while (true) {try {//从同步队列中获取一个元素,如果队列中没有元素,则阻塞String data = blockingQueue.take();System.out.println(Thread.currentThread().getName()+ " take(): " + data);Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}}}/*** 生产者* @author lijh**/class Producer3 implements Runnable {protected SynchronousQueue<String> blockingQueue;final Random random = new Random();public Producer3(SynchronousQueue<String> queue) {this.blockingQueue = queue;}@Overridepublic void run() {while (true) {try {String data = UUID.randomUUID().toString();System.out.println("Put: " + data);//往同步队列中插入一个元素,如果队列不为空则阻塞blockingQueue.put(data);Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。