阻塞队列介绍

    科技2022-08-19  98

    阻塞队列

    BlockingQueue和List接口一样,都继承Collection接口

    实现类: ArrayBlockingQueue:由数组结构组成的有界阻塞队列 LinkedBlockingQueue:由链表结构组成的有界阻塞队列默认值是int最大值 SynchronousQueue:不存储元素的阻塞队列,即单元素队列

    常用API

    抛出异常(不友好)

    代码如下:

    BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); blockingQueue .add("a"); blockingQueue .add("b"); blockingQueue .add("c"); //blockingQueue .add("d"); //Queue full blockingQueue .remove(); blockingQueue .remove(); blockingQueue .remove(); //blockingQueue .remove(); //NoSuchElementException

    特殊值(开发常用)

    BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.offer("a")); System.out.println(blockingQueue.offer("b")); System.out.println(blockingQueue.offer("c")); System.out.println(blockingQueue.offer("d")); //false System.out.println(blockingQueue.poll()); //a System.out.println(blockingQueue.poll()); //b System.out.println(blockingQueue.poll()); //c System.out.println(blockingQueue.poll()); //null

    阻塞

    BlockingQueue<String> blockingQueue= new ArrayBlockingQueue<>(3); blockingQueue.put("a"); blockingQueue.put("b"); blockingQueue.put("c"); blockingQueue.put("d"); //会一直阻塞 System.out.println(blockingQueue.take()); //a System.out.println(blockingQueue.take()); //b System.out.println(blockingQueue.take()); //c System.out.println(blockingQueue.take()); //null

    超时

    BlockingQueue<String> blockingQueue= new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.offer("b")); System.out.println(blockingQueue.offer("c")); System.out.println(blockingQueue.offer("d")); System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS)); //等待2s钟插入不进去返回false System.out.println(blockingQueue.poll()); //b System.out.println(blockingQueue.poll()); //c System.out.println(blockingQueue.poll()); //d System.out.println(blockingQueue.poll()); //null

    总结:

    抛出异常:

    当阻塞队列满时候,再往里边add元素时候就会抛queue full 当阻塞队列空时候,再往里边remove时会抛NosuchElmentException

    特殊值:

    插入方法offer插入成功返回true,插入失败返回false。 移除方法poll 移除成功返回对应元素,移除失败返回null。

    阻塞:

    当队列满时,生产者再往里边put元素时,队列就会一直阻塞生产线程知道put进去或者响应中断退出。 当队列空时,消费者线程试图从队列中take元素时,生产者线程会一直阻塞知道队列中有元素才退出。

    超时

    当队列满时,队列会阻塞生产者线程一定时间,超时后生产者线程退出。

    SynchronousQueue不存储元素,生产一个消费一个

    代码展示:

    public static void main(String[] args) { SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>(); new Thread(new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread().getName()+"\t"+"put 1"); synchronousQueue.put("1"); System.out.println(Thread.currentThread().getName()+"\t"+"put 2"); synchronousQueue.put("2"); System.out.println(Thread.currentThread().getName()+"\t"+"put 3"); synchronousQueue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } } },"aa").start(); new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(5000); //睡5s确保aa线程先执行 System.out.println(Thread.currentThread().getName()+synchronousQueue.take()); Thread.sleep(5000); //睡5s确保aa线程先执行 System.out.println(Thread.currentThread().getName()+synchronousQueue.take()); Thread.sleep(5000); //睡5s确保aa线程先执行 System.out.println(Thread.currentThread().getName()+synchronousQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } },"bb").start(); } 结果: aa put 1 bb1 aa put 2 bb2 aa put 3 bb3

    结论: SynchronousQueue阻塞队列生产一个消费一个,只要不消费我就阻塞不会生产,队列中不存储元素。

    阻塞队列用在哪里

    生产者消费者模式

    传统版的生产者消费者模式

    /** * 多线程环境下的生产者消费者模式 * 1.线程 操作(方法) 资源类 * 2.判断 干活 唤醒 * 3.防止虚假唤醒机制(用while判断) */ class ShareData{ //资源类 private static int number; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment() throws Exception { //增加 lock.lock(); //相当于同步代码块,在类没有初始化的时候都已经加载到内存了, //因此在同步代码块内部可以用静态变量。 //虽然这个方法是非静态方法。 try{ while(number !=0){ //已经生产了一个就不要再生产了 //生产线程停止 condition.await(); } //干活先生产一个 number++; //唤醒生产线程 condition.signalAll(); System.out.println(Thread.currentThread().getName()+"\t"+number); }finally { lock.unlock(); } } public void decrement() throws Exception { //减少 lock.lock(); try{ while(number ==0){ //还没生产 //消费线程停止 condition.await(); } //干活 number--; //唤醒生产线程 condition.signalAll(); System.out.println(Thread.currentThread().getName()+"\t"+number); }finally { lock.unlock(); } } } 入口函数 public static void main(String[] args) { ShareData shareData = new ShareData(); new Thread(new Runnable() { @Override public void run() { for (int i=1;i<=5;i++){ try { shareData.increment(); //当我第一次调用这个方法number++执行 } catch (Exception e) { e.printStackTrace(); } } } },"aa").start(); new Thread(new Runnable() { @Override public void run() { for (int i=1;i<=5;i++){ try { shareData.decrement(); //第一次我先睡眠,等待aa线程唤醒 } catch (Exception e) { e.printStackTrace(); } } } },"bb").start(); } 结果: aa 1 bb 0 aa 1 bb 0 aa 1 bb 0 aa 1 bb 0 aa 1 bb 0

    总结: 线程操作资源类,资源类是高内聚里边有加1和-1两个方法,线程对这两个方法进 行操作,初始值是0,线程bb等待,线程aa执行,把number变量+1唤醒bb线程, 自己还想执行这个方法,结果陷入等待,bb线程醒来之后把number变量-1,唤 醒aa,自己等待。如果把while换成if两个线程不会出现异常,如果是大于两个就 会出现异常。

    阻塞队列版的生产者消费者

    public class BlockingqueueDemo { private volatile Boolean flag = true; //保证线程间可见性 private BlockingQueue<String> blockingQueue = null; private AtomicInteger atomicInteger = new AtomicInteger(); public BlockingqueueDemo(BlockingQueue<String> blockingQueue){ this.blockingQueue = blockingQueue; System.out.println(blockingQueue.getClass().getName()); //打印传入的实现类是什么 } public void pro(){ //生产者 String data = null; try { while (flag){ data = atomicInteger.incrementAndGet()+""; //++操作 data = 1; Boolean result = blockingQueue.offer(data,2L, TimeUnit.SECONDS); //放入阻塞队列 if(result){ System.out.println(Thread.currentThread().getName()+"放入"+data+"成功"); }else{ System.out.println(Thread.currentThread().getName()+"放入"+data+"失败"); } Thread.sleep(1000); //停一秒让消费线程执行 } System.out.println("标记位被置为false,停止生产和消费"); } catch (InterruptedException e) { e.printStackTrace(); } } public void Consumer(){ //消费者 String result = null; try { while (flag){ result = blockingQueue.poll(2L, TimeUnit.SECONDS); Thread.sleep(1000); //停止一秒确保生产。 if(null == result){ //flag被变成false生产不生产了,消费就取不到方法结束。 return; } System.out.println("消费线程消费"+result+"成功"); } } catch (InterruptedException e) { e.printStackTrace(); } } public void stop(){ this.flag = false; }

    结论: 用volatile控制线程间的可见性,只要标记为位true,生产者、消费者同时执 行,一直在做循环(CAS)生产消费。当标记位为false时跳出循环生产者,消费者结束。用阻塞队列存储消息,不用加锁。

    Processed: 0.016, SQL: 9