JAVA多线程问题 — 实现一个基于优先级的传输队列

    科技2025-06-25  16

    实现一个基于优先级的传输队列

    Java 7 API 提供几种与并发应用相关的数据类型。重点介绍以下2种数据类型:

    LinkedTransferQueue:这个数据类型支持那些有生产者和消费者结构的程序。 在那些应用,你有一个或者多个数据生产者,一个或多个数据消费者和一个被生产者和消费者共享的数据类型。生产者把数据放入数据结构内,然后消费者从数据结构内提取数据。如果数据结构为空,消费者会被阻塞直到有数据可以消费。如果数据结构满了,生产者就会被阻塞直到有空位来放数据。PriorityBlockingQueue:在这个数据结构,元素是按照顺序储存的。元素们必须实现 带有 compareTo() 方法的 Comparable 接口。当你在结构中插入数据时,它会与数据元素对比直到找到它的位置。

    LinkedTransferQueue 的元素是按照抵达顺序储存的,所以越早到的越先被消耗。你有可能需要开发 producer/ consumer 程序,它的消耗顺序是由优先级决定的而不是抵达时间。在这个指南,你将学习如何实现在 producer/ consumer 问题中使用的数据结构,这些元素将被按照他们的优先级排序,级别高的会先被消耗。

    按照这些步骤来实现下面的例子: 1、创建一个类,名为 MyPriorityTransferQueue,扩展 PriorityBlockingQueue 类并实现 TransferQueue:

    //1. 创建一个类,名为 MyPriorityTransferQueue,扩展 PriorityBlockingQueue 类并实现 TransferQueue 接口。 public class MyPriorityTransferQueue<E> extends PriorityBlockingQueue<E> implements TransferQueue<E> { //2. 声明一个私有 AtomicInteger 属性,名为 counter,用来储存正在等待元素的消费者的数量。 private AtomicInteger counter; //3. 声明一个私有 LinkedBlockingQueue 属性,名为 transferred。 private LinkedBlockingQueue<E> transfered; //4. 声明一个私有 ReentrantLock 属性,名为 lock。 private ReentrantLock lock; //5. 实现类的构造函数,初始化它的属性值。 public MyPriorityTransferQueue() { counter = new AtomicInteger(0); lock = new ReentrantLock(); transfered = new LinkedBlockingQueue<E>(); } //6. 实现 tryTransfer() 方法。此方法尝试立刻发送元素给正在等待的消费者(如果可能)。如果没有任何消费者在等待,此方法返回 false 值。 @Override public boolean tryTransfer(E e) { lock.lock(); boolean value; if (counter.get() == 0) { value = false; } else { put(e); value = true; } lock.unlock(); return value; } //7. 实现 transfer() 方法。此方法尝试立刻发送元素给正在等待的消费者(如果可能)。如果没有任何消费者在等待, //此方法把元素存入一个特殊queue,为了发送给第一个尝试获取一个元素的消费者并阻塞线程直到元素被消耗。 @Override public void transfer(E e) throws InterruptedException { lock.lock(); if (counter.get() != 0) { put(e); lock.unlock(); } else { transfered.add(e); lock.unlock(); synchronized (e) { e.wait(); } } } //8. 实现 tryTransfer() 方法,它接收3个参数: 元素,和需要等待消费者的时间(如果没有消费者的话),和用来注明时间的单位。如果有消费者在等待,立刻发送元素。 // 否则,转化时间到毫秒并使用 wait() 方法让线程进入休眠。当消费者取走元素时,如果线程在 wait() 方法里休眠,将使用 notify() 方法唤醒它。 @Override public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { lock.lock(); if (counter.get() != 0) { put(e); lock.unlock(); return true; } else { transfered.add(e); long newTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit); lock.unlock(); e.wait(newTimeout); lock.lock(); if (transfered.contains(e)) { transfered.remove(e); lock.unlock(); return false; } else { lock.unlock(); return true; } } } //9. 实现 hasWaitingConsumer() 方法。使用 counter 属性值来计算此方法的返回值。如果counter 的值大于0,放回 true。不然,返回 false。 @Override public boolean hasWaitingConsumer() { return (counter.get() != 0); } //10. 实现 getWaitingConsumerCount() 方法。返回counter 属性值。 @Override public int getWaitingConsumerCount() { return counter.get(); } //11.实现 take() 方法。此方法是当消费者需要元素时被消费者调用的。首先,获取之前定义的锁并增加在等待的消费者数量。 @Override public E take() throws InterruptedException { lock.lock(); counter.incrementAndGet(); //12.如果在 transferred queue 中无任何元素。释放锁并使用 take() 方法尝试从queue中获取元素,此方法将让线程进入睡眠直到有元素可以消耗。 E value = transfered.poll(); if (value == null) { lock.unlock(); value = super.take(); lock.lock(); //13. 否则,从transferred queue 中取走元素并唤醒正在等待要消耗元素的线程(如果有的话)。 } else { synchronized (value) { value.notify(); } } //14. 最后,增加正在等待的消费者的数量并释放锁。 counter.decrementAndGet(); lock.unlock(); return value; } }

    2、实现一个类,名为 Event,扩展 Comparable 接口,把 Event 类参数化:

    //1. 类Event,扩展 Comparable 接口,把 Event 类参数化。 public class Event implements Comparable<Event> { //2. 声明一个私有 String 属性,名为 thread,用来储存创建事件的线程的名字。 private String thread; //3. 声明一个私有 int 属性,名为 priority,用来储存事件的优先级。 private int priority; //4. 实现类的构造函数,初始化它的属性值。 public Event(String thread, int priority) { this.thread = thread; this.priority = priority; } //5. 实现一个方法,返回 thread 属性值。 public String getThread() { return thread; } //6. 实现一个方法,返回 priority 属性值。 public int getPriority() { return priority; } //7. 实现 compareTo() 方法。此方法把当前事件与接收到的参数事件进行对比。返回 -1,如果当前事件的优先级的级别高于参数;返回 1,如果当前事件的优先级低于参数;如果相等,则返回 0。你将获得一个按优先级递减顺序排列的list。有高等级的事件就会被排到queue的最前面。 public int compareTo(Event e) { if (this.priority > e.getPriority()) { return -1; } else if (this.priority < e.getPriority()) { return 1; } else { return 0; } } }

    3、生产者Producer:

    //1.类Producer,它实现 Runnable 接口。 public class Producer implements Runnable { //2. 声明一个私有 MyPriorityTransferQueue 属性,接收参数化的 Event 类属性,名为 buffer,用来储存这个生产者生成的事件。 private MyPriorityTransferQueue<Event> buffer; //3. 实现类的构造函数,初始化它的属性值。 public Producer(MyPriorityTransferQueue<Event> buffer) { this.buffer = buffer; } //4. 这个类的实现 run() 方法。创建 100 个 Event 对象,用他们被创建的顺序决定优先级(越先创建的优先级越高)并使用 put() 方法把他们插入queue中。 public void run() { for (int i = 0; i < 100; i++) { Event event = new Event(Thread.currentThread().getName(), i); buffer.put(event); } } }

    4、消费者Consumer:

    //1. 实现一个类,名为 Consumer,它要实现 Runnable 接口。 public class Consumer implements Runnable { //2. 声明一个私有 MyPriorityTransferQueue 属性,参数化 Event 类属性,名为 buffer,用来获取这个类的事件消费者。 private MyPriorityTransferQueue<Event> buffer; //3. 实现类的构造函数,初始化它的属性值。 public Consumer(MyPriorityTransferQueue<Event> buffer) { this.buffer = buffer; } //4. 实现 run() 方法。它使用 take() 方法消耗1002 Events (这个例子实现的全部事件)并把生成事件的线程数量和它的优先级别写入操控台。 @Override public void run() { for (int i = 0; i < 1002; i++) { try { Event value = buffer.take(); System.out.printf("Consumer: %s: %d\n", value.getThread(), value.getPriority()); } catch (InterruptedException e) { e.printStackTrace(); } } } }

    5、主类测试:

    //1. 创建例子的主类通过创建一个类,名为 Main 并添加 main()方法。 public class Main { public static void main(String[] args) throws Exception { //2. 创建一个 MyPriorityTransferQueue 对象,名为 buffer。 MyPriorityTransferQueue<Event> buffer = new MyPriorityTransferQueue<Event>(); //3. 创建一个 Producer 任务并运行 10 线程来执行任务。 Producer producer = new Producer(buffer); Thread producerThreads[] = new Thread[10]; for (int i = 0; i < producerThreads.length; i++) { producerThreads[i] = new Thread(producer); producerThreads[i].start(); } //4.创建并运行一个 Consumer 任务。 Consumer consumer = new Consumer(buffer); Thread consumerThread = new Thread(consumer); consumerThread.start(); //5. 写入当前的消费者数量。 System.out.printf("Main: Buffer: Consumer count: %d\n", buffer.getWaitingConsumerCount()); //6. 使用 transfer() 方法传输一个事件给消费者。 Event myEvent = new Event("Core Event", 0); buffer.transfer(myEvent); System.out.printf("Main: My Event has ben transfered.\n"); //7. 使用 join() 方法等待生产者的完结。 for (int i = 0; i < producerThreads.length; i++) { try { producerThreads[i].join(); } catch (InterruptedException e) { e.printStackTrace(); } } //8. 让线程休眠1秒。 TimeUnit.SECONDS.sleep(1); //9.写入当前的消费者数量。 System.out.printf("Main: Buffer: Consumer count: %d\n", buffer.getWaitingConsumerCount()); //10. 使用 transfer() 方法传输另一个事件。 myEvent = new Event("Core Event 2", 0); buffer.transfer(myEvent); //11. 使用 join() 方法等待消费者完结。 consumerThread.join(); //12. 写信息表明程序结束。 System.out.printf("Main: End of the program\n"); } }

    在这个例子已经实现了 MyPriorityTransferQueue 数据结构。这个数据类型是在 producer/consumer 问题中使用的,它的元素是按照优先级排列的。由于 Java 不支持多个继承,所以你首先要决定的是 MyPriorityTransferQueue 类的基类。你扩展了 PriorityBlockingQueue 类,来实现在结构中插入数据按照优先级排序。你也实现了 TransferQueue 接口,添加了与 producer/consumer 相关的3个方法。

    MyPriorityTransferQueue 类有以下2个属性:

    AtomicInteger 属性,名为 counter: 此属性储存了正在等待从数据类型提取元素的消费者的数量。当一个消费者调用 take()操作来从数据类型中提取元素时,counter 数增加。当消费者结束 take() 操作的执行时,counter 数再次增加。在 hasWaitingConsumer() 和 getWaitingConsumerCount() 方法的实现中使用到了 counter。ReentrantLock 属性,名为 lock: 此属性是用来控制访问已实现的操作。只有一个线程可以用数据类型。最后一个,LinkedBlockingQueue list 用来储存传输的元素。

    在 MyPriorityTransferQueue 中,你实现了一些方法。全部方法都在 TransferQueue 接口中声明了和在PriorityBlockingQueue 接口实现的 take() 方法。在之前已经描述了2个方法了。来看看剩下的方法的描述:

    tryTransfer(E e): 此方法尝试直接发送元素给消费者。如果有消费者在等待,此方法储存元素到 priority queue 中为了立刻提供给消费者,并返回 true 值。如果没有消费者在等待,方法返回 false 值。

    transfer(E e): 此方法直接发送元素给消费者。如果有消费者在等待,此方法储存元素到 priority queue 中为了立刻提供给消费者。 否则,把元素储存到已传输的元素list 并阻塞线程直到元素被消耗。当线程进入休眠时,你要释放锁,如果不的话,你就阻塞了queue。

    tryTransfer(E e, long timeout, TimeUnit unit): 此方法与 transfer() 方法相似,只是它的线程被阻塞的时间段是由参数决定的。当线程进入休眠时,你要释放锁,如果不的话,你就阻塞了queue。

    take(): 此方法返回下一个要被消耗的元素。如果在 transferred 元素list中有元素,就从list中取走元素。否则,就从 priority queue 中取元素。

    一旦你实现了数据类型,你就实现了 Event 类。它就是在数据类型里储存的元素构成的类。Event 类有2个属性用来储存生产者的ID和事件的优先级,并实现了 Comparable 接口,为了满足你的数据类型的需要。

    接着,你实现了 Producer 和 Consumer 类。在这个例子中,你有 10 个生产者和一个消费者,他们共享同一个 buffer。每个生产者生成100个事件,他们的优先级是递增的, 所以有高优先级的事件在越后面才生成。

    例子的主类创建了一个 MyPriorityTransferQueue 对象,10个生产者,和一个消费者,然后使用MyPriorityTransferQueue buffer 的 transfer() 方法来传输2个事件到 buffer。

    以下截图是程序运行的部分输出:

    Processed: 0.009, SQL: 8