Nacos配置中心集群原理及源码分析

Nacos作为配置中心,必然需要保证服务节点的高可用性,那么Nacos是如何实现集群的呢?
下面这个图,表示Nacos集群的部署图 。

Nacos配置中心集群原理及源码分析

文章插图
 
Nacos集群工作原理Nacos作为配置中心的集群结构中,是一种无中心化节点的设计,由于没有主从节点,也没有选举机制,所以为了能够实现热备,就需要增加虚拟IP(VIP) 。
Nacos的数据存储分为两部分
/data/program/nacos-1/data/config-data/${GROUP}在Nacos的设计中,MySQL是一个中心数据仓库,且认为在Mysql中的数据是绝对正确的 。除此之外,Nacos在启动时会把Mysql中的数据写一份到本地磁盘 。
这么设计的好处是可以提高性能,当客户端需要请求某个配置项时,服务端会想Ian从磁盘中读取对应文件返回,而磁盘的读取效率要比数据库效率高 。
【Nacos配置中心集群原理及源码分析】当配置发生变更时:
  1. Nacos会把变更的配置保存到数据库,然后再写入本地文件 。
  2. 接着发送一个HTTP请求,给到集群中的其他节点,其他节点收到事件后,从Mysql中dump刚刚写入的数据到本地文件中 。
另外,NacosServer启动后,会同步启动一个定时任务,每隔6小时,会dump一次全量数据到本地文件
配置变更同步入口当配置发生修改、删除、新增操作时,通过发布一个 notifyConfigChange 事件 。
@PostMApping@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,@RequestParam(value = https://www.isolves.com/it/wl/js/2022-03-30/"dataId") String dataId, @RequestParam(value = "group") String group,@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,@RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,@RequestParam(value = "appName", required = false) String appName,@RequestParam(value = "src_user", required = false) String srcUser,@RequestParam(value = "config_tags", required = false) String configTags,@RequestParam(value = "desc", required = false) String desc,@RequestParam(value = "use", required = false) String use,@RequestParam(value = "effect", required = false) String effect,@RequestParam(value = "type", required = false) String type,@RequestParam(value = "schema", required = false) String schema) throws NacosException {//省略..if (StringUtils.isBlank(betaIps)) {if (StringUtils.isBlank(tag)) {persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));} else {persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));}}//省略return true;}AsyncNotifyService配置数据变更事件,专门有一个监听器AsyncNotifyService,它会处理数据变更后的同步事件 。
@Autowiredpublic AsyncNotifyService(ServerMemberManager memberManager) {this.memberManager = memberManager;// Register ConfigDataChangeEvent to NotifyCenter.NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);// Register A Subscriber to subscribe ConfigDataChangeEvent.NotifyCenter.registerSubscriber(new Subscriber() {@Overridepublic void onEvent(Event event) {// Generate ConfigDataChangeEvent concurrentlyif (event instanceof ConfigDataChangeEvent) {ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;long dumpTs = evt.lastModifiedTs;String dataId = evt.dataId;String group = evt.group;String tenant = evt.tenant;String tag = evt.tag;Collection<Member> ipList = memberManager.allMembers(); //得到集群中的ip列表// 构建NotifySingleTask,并添加到队列中 。Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();for (Member member : ipList) { //遍历集群中的每个节点queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),evt.isBeta));}//异步执行任务 AsyncTaskConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));}}@Overridepublic Class<? extends Event> subscribeType() {return ConfigDataChangeEvent.class;}});}AsyncTask@Overridepublic void run() {executeAsyncInvoke();}private void executeAsyncInvoke() {while (!queue.isEmpty()) {//遍历队列中的数据,直到数据为空NotifySingleTask task = queue.poll(); //获取taskString targetIp = task.getTargetIP(); //获取目标ipif (memberManager.hasMember(targetIp)) { //如果集群中的ip列表包含目标ip// start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify//判断目标ip的健康状态boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp); //if (unHealthNeedDelay) { //如果目标服务是非健康,则继续添加到队列中,延后再执行 。// target ip is unhealthy, then put it in the notification listConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,0, task.target);// get delay time and set fail count to the taskasyncTaskExecute(task);} else {//构建headerHeader header = Header.newInstance();header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());if (task.isBeta) {header.addParam("isBeta", "true");}AuthHeaderUtil.addIdentityToHeader(header);//通过restTemplate发起远程调用,如果调用成功,则执行AsyncNotifyCallBack的回调方法restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));}}}}


推荐阅读