分布式一致性协议有很多,例如Paxos协议,Zab协议,Raft协议,而Nacos采用的是Distro协议和Raft协议。对于非临时数据,Nacos采用的是Raft协议,而临时数据Nacos采用的是Distro协议。简单说一下Distro,Distro协议被定位为临时数据的一致性协议:该类型协议不需要把数据存储到磁盘或者数据库,因为临时数据通常和服务器保持一个session会话,该会话只要存在,数据就不会丢失。本篇文章主要针对于CP的Raft算法。
Raft协议是一种强一致性、去中心化、高可用的分布式协议,它是用来解决分布式一致性问题的,相对于大名鼎鼎的Paxos协议,Raft协议更容易理解,并且在性能、可靠性、可用性方面是不输于Paxos协议的。许多中间件都是利用Raft协议来保证分布式一致性的,例如Redis的sentinel,CP模式的Nacos的leader选举都是通过Raft协议来实现的。因为Nacos的一致性协议是采用的Raft协议,所以建议在看相关代码前先了解Raft协议。 可以通过下面链接的动图了解Raft算法: raft算法
首先是init方法,init方法上存在@PostConstruct注解,所以在RaftCore初始化后就会执行init方法。
@PostConstruct public void init() throws Exception { Loggers.RAFT.info("initializing Raft sub-system"); //建立一个监听,监听Datums的修改和删除事件,(利用生产者消费者模式) executor.submit(notifier); final long start = System.currentTimeMillis(); //加载本地磁盘的Datums和Term进行数据恢复 raftStore.loadDatums(notifier, datums); setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L)); Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm()); while (true) { if (notifier.tasks.size() <= 0) { break; } Thread.sleep(1000L); } initialized = true; Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start)); //每隔500ms去检查一次leader状态 GlobalExecutor.registerMasterElection(new MasterElection()); GlobalExecutor.registerHeartbeat(new HeartBeat()); Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}", GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS); }在这个方法中主要做了一下几件事: 1.建立一个监听,监听Datums的修改和删除事件 2.加载本地磁盘的Datums和Term进行数据恢复 3.注册leader选举的任务 4.注册发送心跳的任务
在init方法中注册了一个每500ms执行一次的leader选举的任务MasterElection,
public class MasterElection implements Runnable { @Override public void run() { try { if (!peers.isReady()) { return; } //得到本机信息 RaftPeer local = peers.local(); //leaderDueMs 发起投票的倒计时初始化是0-15000ms之间的随机值 // 任务每500ms执行一次,每次进来-500ms local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS; if (local.leaderDueMs > 0) { return; } //倒计时小于0时进行拉票 // reset timeout //重置leader选举时间 local.resetLeaderDue(); //重置心跳发送时间 local.resetHeartbeatDue(); //拉票 sendVote(); } catch (Exception e) { Loggers.RAFT.warn("[RAFT] error while master election {}", e); } }leaderDueMs(默认是0-15000ms之间的随机数)每次进入到这个方法中都会减少500ms,当leaderDueMs减少到小于等于0,那么就会发起投票进行选举。而选举拉票的逻辑,主要就是sendVote()方法。
private void sendVote() { RaftPeer local = peers.get(NetUtils.localServer()); Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()), local.term); peers.reset();//重置leader和票据 local.term.incrementAndGet();//增加term local.voteFor = local.ip; //当前投自己 local.state = RaftPeer.State.CANDIDATE;//设置当前状态为候选人 Map<String, String> params = new HashMap<>(1); params.put("vote", JacksonUtils.toJson(local)); //遍历除本机之外的机器 for (final String server : peers.allServersWithoutMySelf()) { //构建http请求 final String url = buildUrl(server, API_VOTE); try { HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT .error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url); return 1; } //获取其他节点的信息 RaftPeer peer = JacksonUtils.toObj(response.getResponseBody(), RaftPeer.class); Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer)); //决定谁是leader peers.decideLeader(peer); return 0; } }); } catch (Exception e) { Loggers.RAFT.warn("error while sending vote to server: {}", server); } } } }而sendVote()方法主要就是做了以下几件事: 1.重置leader和选票
public void reset() { leader = null; for (RaftPeer peer : peers.values()) { peer.voteFor = null; } }2.把本机的term+1,把选票投给自己,并且设置自己为候选人的角色
local.term.incrementAndGet();//增加term local.voteFor = local.ip; //当前投自己 local.state = RaftPeer.State.CANDIDATE;//设置当前状态为候选人3.遍历本机以外的机器,发送异步请求到 /v1/ns/raft/vote接口,获取其他机器的选票信息
4.处理请求结果,根据请求的结果决定leader
peers.decideLeader(peer);首先看集群其他机器接收到请求后的处理逻辑,请求到RaftController的 /v1/ns/raft/vote接口后
@PostMapping("/vote") public JsonNode vote(HttpServletRequest request, HttpServletResponse response) throws Exception { RaftPeer peer = raftCore.receivedVote(JacksonUtils.toObj(WebUtils.required(request, "vote"), RaftPeer.class)); return JacksonUtils.transferToJsonNode(peer); }最主要的逻辑就是在raftCore.receivedVote()
public synchronized RaftPeer receivedVote(RaftPeer remote) { if (!peers.contains(remote)) { throw new IllegalStateException("can not find peer: " + remote.ip); } RaftPeer local = peers.get(NetUtils.localServer()); if (remote.term.get() <= local.term.get()) { //如果请求的任期小于自己的任期并且还没有投出选票,那么将票投给自己 String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term; Loggers.RAFT.info(msg); if (StringUtils.isEmpty(local.voteFor)) { local.voteFor = local.ip; } return local; } //选举时间重置 local.resetLeaderDue(); //状态标记为follower,并且为请求过来的机器投票 local.state = RaftPeer.State.FOLLOWER; local.voteFor = remote.ip; local.term.set(remote.term.get()); Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term); return local; }这个方法主要就是处理自己的选票的,当收到其他机器拉票的请求的时候,会比较term,如果自身的term大于全程请求机器的term,并且自己的选票没有还没投出去的时候,就把选票投给自己,否则将选票投给远程请求的机器,并且把自己的状态设置为follower,并且把信息返回出去。
在收到其他机器的返回的信息后,会根据结果决定leader,而这块的主要逻辑就是在 peers.decideLeader(peer)中。
public RaftPeer decideLeader(RaftPeer candidate) { peers.put(candidate.ip, candidate); SortedBag ips = new TreeBag(); //选票最多的票数 int maxApproveCount = 0; //选票最多的ip String maxApprovePeer = null; for (RaftPeer peer : peers.values()) { if (StringUtils.isEmpty(peer.voteFor)) { continue; } //收集选票 ips.add(peer.voteFor); //获取选票的最大值,并且设置选票最多的ip if (ips.getCount(peer.voteFor) > maxApproveCount) { maxApproveCount = ips.getCount(peer.voteFor); maxApprovePeer = peer.voteFor; } } //如果票数大于半数,将最多票数的节点状态设置为leader状态 if (maxApproveCount >= majorityCount()) { RaftPeer peer = peers.get(maxApprovePeer); peer.state = RaftPeer.State.LEADER; // 如果当前leader和选举出来的leader不是同一个,那么将选举的leader重置并且发布一个leader选举完成的事件 if (!Objects.equals(leader, peer)) { leader = peer; ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local())); Loggers.RAFT.info("{} has become the LEADER", leader.ip); } } return leader; }其实这个方法就是看是否有机器已经获得一半以上的票数,如果有,那么设置它为leader,leader选举也就完成了。
至于发送心跳的部分,由于篇幅原因就在下一篇文章中讲解。
个人觉得阿里巴巴体系的源码还是比较好懂的,无论是nacos还是dubbo,感觉看起来不是太难,只要花时间应该都可以看得懂。