阻塞队列

    科技2022-08-15  105

    工作中需要面对: 阻塞队列首先他得是一个队列,他取线程只能从一边取,放线程得从另一个边取。 当队列空的时候,取线程的一边就会阻塞。 当队列满的时候,放线程的一边就会阻塞。 为什么需要阻塞队列? 因为在多线程中,有些情况线程需要阻塞,也就是被挂起,而又有些情况我们需要唤醒我们阻塞的队列。而阻塞队列的存在使我们不用关系线程什么时候需要阻塞线程,什么时候需要唤醒线程。这一切都被阻塞队列包办了。在juc包发布以前需要程序要去控制这些细节,尤其需要兼顾效率和线程安全。

    架构: 阻塞队列直接继承了Collection结构和,和List接口平级。有很多个实现类。 但是据阳哥说常用的应该有三个,就是一个数组的阻塞队列,是连续的然后能规定个初始长度。然后就是用链表组成的阻塞队列,最后一个是单个元素的阻塞队列。 四组常用的api:

    import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * @author Jing * @date 2020/10/5 0005 16:20 */ public class BlockQueueDemo { public static void main(String[] args) { // 老规矩左边接口右边实现类 BlockingQueue<String> queue = new ArrayBlockingQueue<>(3); // 接下来演示第一组脾气火爆型api add remove element 他们是如果成功返回布尔不成功报异常 System.out.println(queue.add("1")); System.out.println(queue.add("2")); System.out.println(queue.add("3")); // System.out.println(queue.add("4")); // remove有两种,如果不传参数那么就删除最上面的,如果传参数就是找对应参数这个数据 System.out.println(queue.remove()); System.out.println(queue.remove("3")); queue.remove(); queue.remove(); // 这个方法是看看队列最前面的元素 queue.element(); } } import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * @author Jing * @date 2020/10/5 0005 16:20 */ public class BlockQueueDemo { public static void main(String[] args) { // 老规矩左边接口右边实现类 BlockingQueue<String> queue = new ArrayBlockingQueue<>(3); // 接下来演示第二组 这组很温和 offer peek poll 都是返回布尔值 System.out.println(queue.offer("1")); System.out.println(queue.offer("2")); System.out.println(queue.offer("3")); System.out.println(queue.offer("4")); System.out.println(queue.peek()); queue.poll(); queue.poll(); queue.poll(); queue.poll(); } } import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * @author Jing * @date 2020/10/5 0005 16:20 */ public class BlockQueueDemo { public static void main(String[] args) throws InterruptedException { // 老规矩左边接口右边实现类 BlockingQueue<String> queue = new ArrayBlockingQueue<>(3); // 第三组就是比较温和的一组 有take 和 put 组成 // 这个如果添加不进去了就一直等着,而且就算添加成功也没有其他返回新号说明 queue.put("1"); queue.put("2"); queue.put("3"); // // queue.put("4"); // 同理取数据也是一样,没有提示 如果列表空了会一直等。 queue.take(); queue.take(); queue.take(); queue.take(); } } import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; /** * @author Jing * @date 2020/10/5 0005 16:20 */ public class BlockQueueDemo { public static void main(String[] args) throws InterruptedException { // 老规矩左边接口右边实现类 BlockingQueue<String> queue = new ArrayBlockingQueue<>(3); // 最后一组也是offer但是可以传一个时间类型的,意为如果超时了就不等了 同样会返回布尔型而不是粗暴的报异常 System.out.println(queue.offer("1", 3, TimeUnit.SECONDS)); System.out.println(queue.offer("2", 3, TimeUnit.SECONDS)); System.out.println(queue.offer("3", 3, TimeUnit.SECONDS)); // System.out.println(queue.offer("4", 3, TimeUnit.SECONDS)); // 这个取数据的poll也是一样、而取不出来会返回个null System.out.println(queue.poll(3, TimeUnit.SECONDS)); System.out.println(queue.poll(3, TimeUnit.SECONDS)); System.out.println(queue.poll(3, TimeUnit.SECONDS)); System.out.println(queue.poll(3, TimeUnit.SECONDS)); } } import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; /** * @author Jing * @date 2020/10/5 0005 16:20 */ public class BlockQueueDemo { public static void main(String[] args) throws InterruptedException { // 只有一个单个数据的队列 BlockingQueue<String> queue = new SynchronousQueue<>(); new Thread(() -> { try { System.out.println(Thread.currentThread().getName()+"开始放第一个"); queue.put("1"); System.out.println(Thread.currentThread().getName()+"开始放第二个"); queue.put("2"); System.out.println(Thread.currentThread().getName()+"开始放第三个"); queue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } },"A").start(); new Thread(() -> { try { TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName()+queue.take()); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName()+queue.take()); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName()+queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } },"B").start(); } }

    这个阻塞队列用在哪里呢?一般来说有三个,生产者消费者,线程池,消息中间件。 最后一部分内容,高级版生产者消费者。 这个版本 没有加锁,相当厉害

    import sun.security.provider.NativePRNG; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; /** * @author Jing * @date 2020/10/5 0005 17:13 */ public class ProConDemo { public static void main(String[] args) throws InterruptedException { shop shop = new shop(new ArrayBlockingQueue<>(6)); new Thread(() -> { try { shop.myPro(); } catch (InterruptedException e) { e.printStackTrace(); } },"aa").start(); new Thread(() -> { try { shop.myCon(); } catch (InterruptedException e) { e.printStackTrace(); } },"bb").start(); TimeUnit.SECONDS.sleep(10); shop.setFlag(false); } } class shop { private volatile Boolean flag = true; private final AtomicInteger atomicInteger = new AtomicInteger(); private final BlockingQueue<String> queue; public shop(BlockingQueue<String> queue) { this.queue = queue; System.out.println("传入队列类型" + queue.getClass().getName()); } public void myPro() throws InterruptedException { String data; while (flag) { data = atomicInteger.getAndIncrement() + ""; TimeUnit.MILLISECONDS.sleep(100); System.out.println("生产者正在生产商品"); queue.offer(data, 3, TimeUnit.SECONDS); System.out.println("生产者生产了一个并加到队列" + data); } System.out.println("退出了不再生产"); } public void myCon() throws InterruptedException { String data; while (flag){ if ((data = queue.poll(7, TimeUnit.SECONDS))!=null){ System.out.println("消费了一个商品那就是"+data); }else { break; } } System.out.println("退出了不在消费"); } public void setFlag(Boolean flag) { this.flag = flag; } }
    Processed: 0.019, SQL: 9