Priority queue

    科技2025-11-20  11

    Priority queue(优先队列)

    队列是一种满足先进先出原则的数据结构,在Python中利用列表就可以简单地实现一个队列。首先需要定义一个列表,接在在列表中加入元素,最后利用pop方法将最先加入的元素取出,即可实现一个队列。

    queue = [] queue.append(2) queue.append(3) queue.append(1) queue.pop(0) # output: 2

    队列的应用很多,比如在BFS算法中就使用到了队列的数据结构。但在有些场合中,我们不需要按照时间的先后顺序来取出元素,而是根据优先等级来取元素,这时就需要优先队列这种数据结构。

    简单地说,优先队列也会像队列一样保存一些元素,但取出元素的方式会按照元素的等级进行,一般有最大优先队列和最小优先队列之分,前者每次取出的是最大等级的元素,后者取出的是最小等级的元素。按照这种说法,我们可以将上面的代码简单修改一下就实现了优先队列的功能。

    比如我们按照客户的VIP等级对客户提供服务,级别高的先服务。首先把客户和其VIP等级添加进队列里,接着将列表降序排序,输出第一个元素即是VIP等级最高的客户。

    pq = [] pq.append((2,"Tom")) pq.append((3,"Tina")) pq.append((1,"Tony")) pq.sort(reverse=True) pq.pop(0) # output: (3,"Tina")

    利用列表实现的优先队列虽然简单,但其扩展性较差,比如我们希望输出客户这个对象,而不是将加入列表里面的内容全部输出,这时的列表将不能满足这个功能。同时调用pop方法,会改变剩下元素的索引,其时间复杂度为O(n)。那么有什么方法可以减少时间复杂度,同时提供优先队列的扩展性呢?

    上面代码中,当我们取出最高等级的元素时,所用的时间复杂度为O(n),用何种数据结构可以使得取出最大值或者最小值的时间复杂度小于O(n)呢?二叉堆可以做到,在Heapsort中,我们可以看到对于最大堆,取出最大元素只要取出第一个元素即可,同时调用Max-heapify方法使树的结构满足堆的特性。该方法的时间复杂度为O(logn),因此完成取出最大值的时间复杂度为O(logn),要小于线性时间。

    下面我将介绍如何利用二叉堆来设计一个优先队列,我将以最小优先队列为例进行介绍。

    最主要需要三个方法:

    Insert: 插入元素Extract_Min: 取出最小等级的元素Decrease_Key: 减小某一元素的等级,减小之后需要交换元素以满足堆的特性。

    Python代码如下:

    class PriorityQueue: """Min-heap-based priority queue, using 1-based indexing. Adapted from CLRS. Augmented to include a map of keys to their indices in the heap so that key lookup is constant time and decrease_key(key) is O(log n) time. """ def __init__(self): """Initializes the priority queue.""" self.heap = [None] # To make the index 1-based. self.key_index = {} # key to index mapping. def __len__(self): return len(self.heap) - 1 def __getitem__(self, i): return self.heap[i] def __setitem__(self, i, key): self.heap[i] = key def decrease_key(self, key): """Decreases the value of the key if it is in the priority queue and maintains the heap property.""" index = self.key_index[key] if index: self._decrease_key(index, key) def insert(self, key): """Inserts a key into the priority queue.""" self.heap.append(key) self.key_index[key] = len(self) self._decrease_key(len(self), key) def extract_min(self): """Removes and returns the minimum key.""" if len(self) < 1: return None self._swap(1, len(self)) min = self.heap.pop() del self.key_index[min] self._min_heapify(1) return min def _decrease_key(self, i, key): """Decreases key at a give index. Args: i: index of the key. key: key with decreased value. """ while i > 1: parent = i // 2 if self[parent] > key: self._swap(i, parent) i = parent else: break def _min_heapify(self, i): """Restores the heap property from index i downwards.""" l = 2 * i r = 2 * i + 1 smallest = i if l <= len(self) and self[l] < self[i]: smallest = l if r <= len(self) and self[r] < self[smallest]: smallest = r if smallest != i: self._swap(i, smallest) self._min_heapify(smallest) def _swap(self, i, j): # Swaps the key at indices i and j and updates the key to index map. self.heap[i], self.heap[j] = self.heap[j], self.heap[i] self.key_index[self.heap[i]], self.key_index[self.heap[j]] = i, j

    以下面这个例子说明下上述代码如何使用。现在我需要给车的价值排个序,每次取出价值最低的车。为了方便,创建一个Car类。在上面代码中,我们是按照key的大小构造了二叉堆。按照这个意思我们应该把车的价值用insert方法加入到队列中,但是那样就无法获取其对应的车的信息。为此我们将每个车的对象加入到优先队列,按照它们的价值构造二叉堆,故要在Car类中定义比较大小的方法,如下:

    class Car(object): def __init__(self, name, value): self.name = name self.value = value def __lt__(self, other): # :nodoc: Delegate comparison to distance. return (self.value < other.value or (self.value == other.value and id(self.name) < id(other.name))) def __le__(self, other): # :nodoc: Delegate comparison to distance. return (self.value < other.value or (self.value == other.value and id(self.name) <= id(other.name))) def __gt__(self, other): # :nodoc: Delegate comparison to distance. return (self.value > other.value or (self.value == other.value and id(self.name) > id(other.name))) def __ge__(self, other): # :nodoc: Delegate comparison to distance. return (self.value > other.value or (self.value == other.value and id(self.name) >= id(other.name))) car1 = Car("BMW", 45) car2 = Car("Maybach", 145) car3 = Car("Bugatti", 85) car4 = Car("Cadillac", 78) car5 = Car("Maserati", 85) pq = PriorityQueue() pq.insert(car1) pq.insert(car2) pq.insert(car3) pq.insert(car4) pq.insert(car5) print("队列大小:{0}".format(len(pq))) print(pq.extract_min().name) # output: # 队列大小:5 # BMW

    BMW的价值最小,利用extract_min方法取出了car1对象,其name是BMW。

    其实在python中已经有相关的模块可实现上述的优先队列的功能,如heapq模块,但只提供实现最小优先队列功能。

    import heapq q = [] heapq.heappush(q, (2, 'code')) heapq.heappush(q, (1, 'eat')) heapq.heappush(q, (3, 'sleep')) print(heapq.heappop(q)) # output:(1, 'eat')

    还有queue中的PriorityQueue,其时间复杂度与heapq的一样,区别在于PriorityQueue是同步的,提供了锁语义来支持多个并发的生产者和消费者。

    from queue import PriorityQueue q = PriorityQueue() q.put((2, 'code')) q.put((1, 'eat')) q.put((3, 'sleep')) print(q.get()) # output:(1, 'eat')

    参考资料:

    1.算法导论第三版

    2.https://geek-docs.com/python/python-examples/python-priority-queue.html

    Processed: 0.011, SQL: 8