千家信息网

Nacos源码阅读方法是什么

发表于:2025-01-20 作者:千家信息网编辑
千家信息网最后更新 2025年01月20日,这篇文章主要介绍"Nacos源码阅读方法是什么"的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇"Nacos源码阅读方法是什么"文章能帮助大家解决问题。先给大家献上
千家信息网最后更新 2025年01月20日Nacos源码阅读方法是什么

这篇文章主要介绍"Nacos源码阅读方法是什么"的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇"Nacos源码阅读方法是什么"文章能帮助大家解决问题。

先给大家献上一张我梳理的高清源码图,方便大家对nacos的源码有一个整体上的认识。

有了这张图,我们就很容易去看nacos源码了。

如何找切入点

首先我们得要找一个切入点进入到nacos源码中,那么就从nacos依赖入手

             com.alibaba.cloud            spring-cloud-starter-alibaba-nacos-discovery  

进入这个依赖文件,会发现它又依赖了一个组件:

            com.alibaba.cloud            spring-cloud-alibaba-nacos-discovery

进入依赖之后,我们发现它长这样:

从这张图中,我们发现了一个熟悉的配置文件spring.factories,这是sringboot自动装配的必备文件

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\  com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\  com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\  com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\  com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\  com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\  com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\  com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfigurationorg.springframework.cloud.bootstrap.BootstrapConfiguration=\  com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration

因为这张主要说的是服务注册源码,所以我们可以只用关注(NacosServiceRegistryAutoConfiguration)自动装配文件

public class NacosServiceRegistryAutoConfiguration {        @Bean        public NacosServiceRegistry nacosServiceRegistry(                        NacosDiscoveryProperties nacosDiscoveryProperties) {                return new NacosServiceRegistry(nacosDiscoveryProperties);        }        @ConditionalOnBean(AutoServiceRegistrationProperties.class)        public NacosRegistration nacosRegistration(                        NacosDiscoveryProperties nacosDiscoveryProperties,                        ApplicationContext context) {                return new NacosRegistration(nacosDiscoveryProperties, context);        public NacosAutoServiceRegistration nacosAutoServiceRegistration(                        NacosServiceRegistry registry,                        AutoServiceRegistrationProperties autoServiceRegistrationProperties,                        NacosRegistration registration) {                return new NacosAutoServiceRegistration(registry,                                autoServiceRegistrationProperties, registration);}

我们看到的是三个bean注入,这里给大家介绍一个看源码的小技巧:自动装配的文件中申明的bean类,我们只需要看带有auto的bean,这个往往是入口;NacosAutoServiceRegistration 带有auto,我们点进去看看里面都有什么:

    @Override        protected void register() {                if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {                        log.debug("Registration disabled.");                        return;                }                if (this.registration.getPort() < 0) {                        this.registration.setPort(getPort().get());                }                super.register();        }

里面有一个register()方法,我在这里打个断点,因为我猜测这个就是注册的入口,我现在使用debug模式,启动一个服务,看它会不会调用这个方法:

客户端注册

这里贴上我debug后,进入register方法的调用链截图

看到这个调用链,看到一个onApplicationEvent的回调方法,找到这个方法所在的类AbstractAutoServiceRegistration
这个类继承了ApplicationListener这个多播器监听器,spring启动之后,会发布多播器事件,然后回调实现多播器组件的onApplicationEvent方法,我们从这个方法开始分析:

public void onApplicationEvent(WebServerInitializedEvent event) {                bind(event); // 绑定端口,并启动        }                        @Deprecatedpublic void bind(WebServerInitializedEvent event) {// 设置端口    this.port.compareAndSet(0, event.getWebServer().getPort());    // 启动客户端注册组件        this.start();}public void start() {        // 省略分支代码        // 调用注册                        register();        }

因为springcloud提供了多种注册中心扩展,但是我们这里只引用了nacos注册中心,所以这里直接调用的是NacosServiceRegistry的register方法:

  public void register(Registration registration) {    // 省略分支代码    // 获取服务id                String serviceId = registration.getServiceId();                // 获取组配置                String group = nacosDiscoveryProperties.getGroup();     // 封装服务实例                Instance instance = getNacosInstanceFromRegistration(registration);                // 调用 命名服务的 registerInstance方法 注册实例                        namingService.registerInstance(serviceId, group, instance);        }

进入到registerInstance方法

    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {        if (instance.isEphemeral()) {            // 省略分支代码            // 与服务端建立心跳,默认每隔5秒定时发送新跳包            this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);        }        // 通过http方式向服务端发送注册请求        this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);    }

serverproxy通过调用对http进行封装的reapi方法,向服务端接口("/nacos/v1/ns/instance")发送请求,

   public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {        LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance});        Map params = new HashMap(9);        params.put("namespaceId", this.namespaceId);        params.put("serviceName", serviceName);        params.put("groupName", groupName);        params.put("clusterName", instance.getClusterName());        params.put("ip", instance.getIp());        params.put("port", String.valueOf(instance.getPort()));        params.put("weight", String.valueOf(instance.getWeight()));        params.put("enable", String.valueOf(instance.isEnabled()));        params.put("healthy", String.valueOf(instance.isHealthy()));        params.put("ephemeral", String.valueOf(instance.isEphemeral()));        params.put("metadata", JSON.toJSONString(instance.getMetadata()));        this.reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, (String)"POST");    }

我们知道nacos经常是以集群形式部署的,那客户端是如何选择其中一个节点发送呢,肯定得实现负载均衡的逻辑,我们点击reqAPI,看它是如何实现的

 if (servers != null && !servers.isEmpty()) {                Random random = new Random(System.currentTimeMillis());                // 随机获取一个索引,servers保存的是所有nacos节点地址                int index = random.nextInt(servers.size());                // 遍历所有节点,根据index值,从servers中找到对应位置的server,进行请求调用,如果调用成功则返回,否则依次往后遍历,直到请求成功                for(int i = 0; i < servers.size(); ++i) {                    String server = (String)servers.get(index);                    try {                        return this.callServer(api, params, server, method);                    } catch (NacosException var11) {                        exception = var11;                        LogUtils.NAMING_LOGGER.error("request {} failed.", server, var11);                    } catch (Exception var12) {                        exception = var12;                        LogUtils.NAMING_LOGGER.error("request {} failed.", server, var12);                    }                    // index+1 然后取模 是保证index不会越界                    index = (index + 1) % servers.size();                }                throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());            }

到这里,客户端注册的代码已经分析完了,不过这还不是本篇的结束,我们还得继续分析服务端是如何处理客户端发送过来的注册请求:

服务端处理客户端注册请求

如果需要查看服务端源码的话,则需要将nacos源码下下来 下载地址

我们从服务注册api接口地址(/nacos/v1/ns/instance),可以找到对应的controller为(com.alibaba.nacos.naming.controllers.InstanceController)

因为注册实例发送的是post请求,所以直接找被postmapping注解的register方法

 @CanDistro    @PostMapping    public String register(HttpServletRequest request) throws Exception {// 获取服务名        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);// 获取命名空间id        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);// 注册实例serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request));        return "ok";    }

我们点击进入到registerInstance方法:

    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {        createEmptyService(namespaceId, serviceName, instance.isEphemeral());        Service service = getService(namespaceId, serviceName);        if (service == null) {            throw new NacosException(NacosException.INVALID_PARAM,                "service not found, namespace: " + namespaceId + ", service: " + serviceName);        }// 执行添加实例的操作        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);    }

分析

在nacos中,注册实例后,还需要将注册信息同步到其他节点,所有在nacos中存在两种同步模式AP和CP,ap和cp主要体现在集群中如何同步注册信息到其它集群节点的实现方式上;
nacos通过ephemeral 字段值来决定是使用ap方式同步还是cp方式同步,默认使用的的ap方式同步注册信息。
com.alibaba.nacos.naming.core.ServiceManager.addInstance()

    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {        // 生成服务的key        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);        // 获取服务        Service service = getService(namespaceId, serviceName);        // 使用同步锁处理        synchronized (service) {            List instanceList = addIpAddresses(service, ephemeral, ips);            Instances instances = new Instances();            instances.setInstanceList(instanceList);            // 调用consistencyService.put 处理同步过来的服务            consistencyService.put(key, instances);        }    }

我们在进入到consistencyService.put方法中

点击put方法时,会看到有三个实现类,根据上下文(或者debug方式),可以推断出这里引用的是DelegateConsistencyServiceImpl实现类

    @Override    public void put(String key, Record value) throws NacosException {        // 进入到这个put方法后,就可以知道应该使用ap方式同步还是cp方式同步        mapConsistencyService(key).put(key, value);    }

从下面的方法中 可以判断通过key来判断使用ap还是cp来同步注册信息,其中key是由ephemeral字段组成;

   private ConsistencyService mapConsistencyService(String key) {        return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;    }

AP 方式同步的流程(ephemeralConsistencyService) 本地服务器处理注册信息&将注册信息同步到其它节点

    @Override    public void put(String key, Record value) throws NacosException {        // 处理本地注册列表        onPut(key, value);        // 添加阻塞任务,同步信息到其他集群节点        taskDispatcher.addTask(key);    }

处理本地注册节点

nacos将key做为一个task,添加到notifer中阻塞队列tasks中,并且使用单线程执行,其中notifer是初始化的时候,作为一个线程被放到线程池中(线程池只设置了一个核心线程);

这里有一个点需要告诉大家:在大多数分布式框架,都会采用单线程的阻塞队列来处理耗时的任务,一方面解决并发问题,另一方面能够解决并发带来的写写冲突问题。

线程中的主要处理逻辑就是,循环读取阻塞队列中的内容,然后处理注册信息,更新到内存注册列表中。

同步注册信息到其他集群节点

nacos同样也是把注册key作为一个task存放到 TaskDispatcher 中的taskShedule阻塞队列中,然后开启线程循环读取阻塞队列:

       @Override        public void run() {            List keys = new ArrayList<>();            while (true) {                    String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),                        TimeUnit.MILLISECONDS);                    // 省略判断代码                    // 添加同步的key                    keys.add(key);                    // 计数                    dataSize++;                    // 判断同步的key大小是否等于 批量同步设置的限量 或者 判断据上次同步时间 是否大于 配置的间隔周期,如果满足任意一个,则开始同步                    if (dataSize == partitionConfig.getBatchSyncKeyCount() ||                        (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {                        // 遍历所有集群节点,直接调用http进行同步                        for (Server member : dataSyncer.getServers()) {                            if (NetUtils.localServer().equals(member.getKey())) {                                continue;                            }                            SyncTask syncTask = new SyncTask();                            syncTask.setKeys(keys);                            syncTask.setTargetServer(member.getKey());                            if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {                                Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));                            }                            dataSyncer.submit(syncTask, 0);                        }                        // 记录本次同步时间                        lastDispatchTime = System.currentTimeMillis();                        // 计数清零                        dataSize = 0;                    }            }        }    }

使用ap方式作同步的过程很简单,但是这里面有两种设计思路来解决单个key同步的问题:
如果有新的key推送上来,nacos就发起一次同步,这会造成网络资源浪费,因为每次同步的就只有一个key或者几个key;

同步少量的key解决方案: 只有积累到指定数量的key,才发起批量同步距离上次同步时间超过配置的限制时间,则忽略key数量,直接发起同步 CP 方式同步的流程(RaftConsistencyServiceImpl)

cp模式追求的是数据一致性,为了数据一致性,那么肯定得选出一个leader,由leader首先同步,然后再由leader通知follower前来获取最新的注册节点(或者主动推送给follower)

nacos使用raft协议来进行选举leader,来实现cp模式。

同样进入到 RaftConsistencyServiceImpl的put方法

    @Override    public void put(String key, Record value) throws NacosException {        try {            raftCore.signalPublish(key, value);        } catch (Exception e) {            Loggers.RAFT.error("Raft put failed.", e);            throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e);        }    }

进入到raftCore.signalPublish方法中,我提取几个关键的代码

// 首先判断当前nacos节点是否是leader,如果不是leader,则获取leader节点的ip,然后将请求转发到leader处理,否则往下走if (!isLeader()) {            JSONObject params = new JSONObject();            params.put("key", key);            params.put("value", value);            Map parameters = new HashMap<>(1);            parameters.put("key", key);            raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);            return;        }

同样采用同样队列的方式,去处理本地注册列表

onPublish(datum, peers.local());public void onPublish(Datum datum, RaftPeer source) throws Exception {               // 添加同步key任务到阻塞队列中        notifier.addTask(datum.key, ApplyAction.CHANGE);        Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);    }

遍历所有集群节点,发送http同步请求

 for (final String server : peers.allServersIncludeMyself()) {                // 如果是leader,则不进行同步                if (isLeader(server)) {                    latch.countDown();                    continue;                }                // 组装url 发送同步请求到其它集群节点                final String url = buildURL(server, API_ON_PUB);                HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler() {                    @Override                    public Integer onCompleted(Response response) throws Exception {                        if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {                            Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",                                datum.key, server, response.getStatusCode());                            return 1;                        }                        latch.countDown();                        return 0;                    }                    @Override                    public STATE onContentWriteCompleted() {                        return STATE.CONTINUE;                    }                });            }

关于"Nacos源码阅读方法是什么"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注行业资讯频道,小编每天都会为大家更新不同的知识点。

同步 方法 服务 节点 方式 源码 处理 信息 线程 集群 队列 阻塞 代码 实例 客户 客户端 文件 时间 模式 问题 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 服务器打开iis 山东京博软件开发招聘信息 上海广厦网络技术有限公司 pt配置dhcp服务器 软件开发即征即退手续 威海软件开发团队 七日杀服务器取消商店 互联网企业与科技企业的区别 北海市app软件开发 编写一个简单的服务器接口 5u 服务器 网络安全就在身边手抄报 甘肃慧鑫网络技术有限公司 临沂市万网网络技术有限公司 物联网云服务器最低配置 mysql设置数据库默认用户 护苗网络安全序列课程视频 虹口区直销软件开发推荐货源 腾讯云服务器怎么加入安全组 改过的电脑网络安全密钥忘了 网络安全系列儿童画 互联网大公司用的是什么服务器 python做的软件开发 软件开发人员价值评价 不可替代的语音对讲软件开发 美讯网络技术 服务器日常维修判断故障常用命令 一个关系数据库中的文件 网络安全技术与实训报告心得 无锡云服务器哪个厂家质量好
0