最短路径算法-2

    科技2025-11-28  16

    最短路径算法-2

    在这部分的最短路径算法中,我将介绍两种算法:Dijkstra和Bellman-Ford算法。

    Dijkstra算法

    Dijkstra算法是用来求两点之间最短路径的算法,适用于正权图,算法流程如下:

    1.将所有节点标记为未访问并将其存储。 2.将初始节点的距离设置为零,将其他节点的距离设置为无穷大。 3.选择距离最近的未访问节点,它是当前节点。 4.查找当前节点的未访问邻居,并计算它们到当前节点的距离。将新计算的距离与分配的距离进行比较,然后保存较小的距离。 5.将当前节点标记为已访问,并将其从未访问的集中删除。 6.如果访问了目标节点(在计划两个特定节点之间的路由时),或者未访问的节点之间的最小距离是无穷大,则停止。如果不是,请重复步骤3-6。

    下面我们结合一个实际的例子来说明上述算法流程,假设集合S保存访问过的节点,集合Q保存未访问的节点。

    如上图,每一条边的权重都是正数,现在需要找到A到B的最短路径。首先将所有节点保存在Q中,初始节点距离为0,其余节点的距离为无穷大。接着,选择距离最近的未访问节点A为当前节点,查找当前节点的未访问邻居(B、C),并计算它们到当前节点的距离(10,3),将新计算的距离与分配的距离(无穷大)进行比较,然后保存较小的距离(10,3)。将当前节点标记为已访问,并将其从未访问的集中删除(Q集合删掉A节点)。一直重复上述步骤直到访问到了节点B,如图中所示。这时即可得出A到B的最短路径为7。

    下面根据上述算法流程来编写程序,可以看到我们每次需要选择距离最短的节点进行访问,那么可以利用优先队列数据结构实现这个过程。利用二叉堆实现的优先队列的时间复杂度更小,可以提高算法效率。

    算法中所用的优先队列代码如下,其详细介绍在博客:优先队列。

    class PriorityQueue: """Min-heap-based priority queue, using 1-based indexing. Adapted from CLRS. Augmented to include a map of keys to their indices in the heap so that key lookup is constant time and decrease_key(key) is O(log n) time. """ def __init__(self): """Initializes the priority queue.""" self.heap = [None] # To make the index 1-based. self.key_index = {} # key to index mapping. def __len__(self): return len(self.heap) - 1 def __getitem__(self, i): return self.heap[i] def __setitem__(self, i, key): self.heap[i] = key def decrease_key(self, key): """Decreases the value of the key if it is in the priority queue and maintains the heap property.""" index = self.key_index[key] if index: self._decrease_key(index, key) def insert(self, key): """Inserts a key into the priority queue.""" self.heap.append(key) self.key_index[key] = len(self) self._decrease_key(len(self), key) def extract_min(self): """Removes and returns the minimum key.""" if len(self) < 1: return None self._swap(1, len(self)) min = self.heap.pop() del self.key_index[min] self._min_heapify(1) return min def _decrease_key(self, i, key): """Decreases key at a give index. Args: i: index of the key. key: key with decreased value. """ while i > 1: parent = i // 2 if self[parent] > key: self._swap(i, parent) i = parent else: break def _min_heapify(self, i): """Restores the heap property from index i downwards.""" l = 2 * i r = 2 * i + 1 smallest = i if l <= len(self) and self[l] < self[i]: smallest = l if r <= len(self) and self[r] < self[smallest]: smallest = r if smallest != i: self._swap(i, smallest) self._min_heapify(smallest) def _swap(self, i, j): # Swaps the key at indices i and j and updates the key to index map. self.heap[i], self.heap[j] = self.heap[j], self.heap[i] self.key_index[self.heap[i]], self.key_index[self.heap[j]] = i, j

    同时需要NodeDistancePair类将节点封装起来,以便优先队列根据距离取出节点。

    class NodeDistancePair(object): """Wraps a node and its distance representing it in the priority queue.""" def __init__(self, node, distance): """Creates a NodeDistancePair to be used as a key in the priority queue. """ self.node = node self.distance = distance def __lt__(self, other): # :nodoc: Delegate comparison to distance. return (self.distance < other.distance or (self.distance == other.distance and id(self.node) < id(other.node))) def __le__(self, other): # :nodoc: Delegate comparison to distance. return (self.distance < other.distance or (self.distance == other.distance and id(self.node) <= id(other.node))) def __gt__(self, other): # :nodoc: Delegate comparison to distance. return (self.distance > other.distance or (self.distance == other.distance and id(self.node) > id(other.node))) def __ge__(self, other): # :nodoc: Delegate comparison to distance. return (self.distance > other.distance or (self.distance == other.distance and id(self.node) >= id(other.node)))

    graph也需要一个类进行封装,定义其表示的方法:

    class Vertex(object): def __init__(self, name, d=None, p=None): self.name = name self.d = d self.p = p class Graph(object): def __init__(self, vertex, edge, weight): """ :param vertex: list of vertices :param edge: dict of the edges :param weight: dict of the weight """ self.vertex = [Vertex(v) for v in vertex] self.edge = edge self.weight = weight def get_vertex(self, name): for v in self.vertex: if name == v.name: return v def weight(self, u, v): index = u + v return self.weight[index] def get_adj(self, node): return self.edge[node]

    Dijkstra代码:

    def dijkstra(graph, source, destination): """Performs Dijkstra's algorithm until it finds the shortest path from source to destination in the graph with nodes and edges. Assumes that all weights are non-negative. Args: graph: the graph to be implemented source: the source node in the graph. destination: the destination node in the graph. Returns: A tuple of the path as a list of nodes from source to destination and the number of visited nodes. """ Q = PriorityQueue() # 初始化 for node in graph.vertex: node.parent = None node.queue_key = None # 起始节点添加进队列,表示已经访问 source = graph.get_vertex(source) source.queue_key = NodeDistancePair(source, 0) Q.insert(source.queue_key) num_visited = 0 while len(Q) > 0: # 获取队列中离当前节点距离最近的节点 u = Q.extract_min() num_visited = num_visited + 1 # 判断是否为目标节点 if u.node is destination: break # 遍历当前节点的邻居节点 for next_vertex in graph.get_adj(u.node): # 算下一节点到当前节点的距离 next_dist = graph.weight(u.node, next_vertex) + u.distance next = graph.get_vertex(next_vertex) # 没有被访问,则添加进队列 if next.queue_key is None: next.queue_key = NodeDistancePair(next.name, next_dist) Q.insert(next.queue_key) next.parent = u.node # 有被访问过,则重新计算距离并更新 elif next_dist < next.queue_key.distance: next.queue_key.distance = next_dist Q.decrease_key(next.queue_key) next.parent = u.node #反向遍历找到源节点到目标节点的路径 p = [] des = graph.get_vertex(destination) while des is not None: p.append(des.name) des = graph.get_vertex(des.parent) p.reverse() return (p, num_visited)

    对代码进行简单的测试,其计算所得的路径和路径长度与人工计算的结果相同。

    graph1 = Graph(vertex=['A','B','C','D','E'], edge={'A':['B','C'], 'B':['C','D'], 'C':['B','D','E'], 'D':['E'], 'E':['D']}, weight={'AB':10,'AC':3,'BD':2,'BC':1,'CB':4, 'CD':8,'CE':2,'DE':7,'ED':9}) print(dijkstra(graph1, 'A','D')) print(graph1.get_vertex('D').queue_key.distance) # output: (['A', 'C', 'B', 'D'], 5) 9

    利用二叉堆实现的Dijkstra算法的时间复杂度为 O(V lg V + E lg V )。

    Bellman-Ford算法

    当图中有负权边时,有些情况下Dijkstra算法将不再适用,例如下述情况中出现了negative cycle,若采用Dijkstras算法,程序将无法停止,会进入死循环。因此需要一种算法可以检测neagtive cycles。

    与Dijkstra算法相比,Bellman-Ford算法比较简单,在寻找最短路径时,它不像Dijkstra算法那样每次选择最近的节点,因此不需要优先队列的数据结构。

    class Vertex(object): def __init__(self, name, d=None, p=None): self.name = name self.d = d self.p = p class Graph(object): def __init__(self, vertex, edge, weight): """ :param vertex: list of vertices :param edge: dict of the edges :param weight: dict of the weight """ self.vertex_string = vertex self.vertex = [Vertex(v) for v in vertex] self.edge = edge self.weight = weight def get_vertex(self, name): for v in self.vertex: if name == v.name: return v def V(self): return self.vertex_string def E(self): return self.edge def w(self, u, v): index = u + v return self.weight[index] def initialize_single_source(self, s): for v in self.vertex: v.d = float('Inf') v.p = None if s == v.name: v.d = 0 def check(self, u, v): ve = self.get_vertex(v) ue = self.get_vertex(u) if ve.d > ue.d + self.w(u, v): print("negative cycle appears") def relax(self, u, v): """ :param u: curr vertex :param v: next vertex :return: None """ ve = self.get_vertex(v) ue = self.get_vertex(u) if ve.d > ue.d + self.w(u, v): ve.d = ue.d + self.w(u, v) ve.p = ue def get_adj(self, node): return self.edge[node]

    上面的代码用来表示图,下面是Bellman-Ford算法。首先对图进行了初始化,源节点的距离设置为0,其余节点设置为无穷大。对图中的每条边进行遍历更新距离,迭代|V-1|次,V是图的节点数。最后一步检测是否存在negative cycles,即判断是否存在ve.d > ue.d + w(u, v)的情况,其中ue为当前节点,ve为下一节点。

    def bellman_ford(G, s): G.initialize_single_source(s) for i in range(len(G.V())): for vertex in G.E(): for next_vertex in G.E()[vertex]: G.relax(vertex, next_vertex) for vertex in G.E(): for next_vertex in G.E()[vertex]: G.check(vertex, next_vertex) # 图中有negative cycle时: graph1 = Graph(vertex=['A','B','C','D','E'], edge={'A':['B','C'], 'B':['C','D'], 'C':['B','D','E'], 'D':['E'], 'E':['D']}, weight={'AB':10,'AC':3,'BD':2,'BC':1,'CB':-4, 'CD':8,'CE':2,'DE':7,'ED':9}) bellman_ford(graph1, 'A') print(graph1.get_vertex('B').d) # output: # negative cycle appears # negative cycle appears # -13 # 更改权重:计算的距离与人工手算相同 graph1 = Graph(vertex=['A','B','C','D','E'], edge={'A':['B','C'], 'B':['C','D'], 'C':['B','D','E'], 'D':['E'], 'E':['D']}, weight={'AB':10,'AC':3,'BD':2,'BC':1,'CB':4, 'CD':8,'CE':2,'DE':7,'ED':9}) # output: 7

    该算法的时间复杂度为O(VE),因为初始化需要O(V)时间,对E条边进行遍历更新距离,迭代|V-1|次需要O(VE)时间。

    Processed: 0.018, SQL: 9