千家信息网

nacos ServiceManager中UpdatedServiceProcessor的原理和作用是什么

发表于:2025-01-29 作者:千家信息网编辑
千家信息网最后更新 2025年01月29日,本篇内容主要讲解"nacos ServiceManager中UpdatedServiceProcessor的原理和作用是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小
千家信息网最后更新 2025年01月29日nacos ServiceManager中UpdatedServiceProcessor的原理和作用是什么

本篇内容主要讲解"nacos ServiceManager中UpdatedServiceProcessor的原理和作用是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"nacos ServiceManager中UpdatedServiceProcessor的原理和作用是什么"吧!

本文主要研究一下nacos ServiceManager的UpdatedServiceProcessor

ServiceManager.init

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@Component@DependsOn("nacosApplicationContext")public class ServiceManager implements RecordListener {    /**     * Map>     */    private Map> serviceMap = new ConcurrentHashMap<>();    private LinkedBlockingDeque toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);    private Synchronizer synchronizer = new ServiceStatusSynchronizer();    private final Lock lock = new ReentrantLock();    @Resource(name = "consistencyDelegate")    private ConsistencyService consistencyService;    @Autowired    private SwitchDomain switchDomain;    @Autowired    private DistroMapper distroMapper;    @Autowired    private ServerListManager serverListManager;    @Autowired    private PushService pushService;    private final Object putServiceLock = new Object();    @PostConstruct    public void init() {        UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);        UtilsAndCommons.SERVICE_UPDATE_EXECUTOR.submit(new UpdatedServiceProcessor());        try {            Loggers.SRV_LOG.info("listen for service meta change");            consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);        } catch (NacosException e) {            Loggers.SRV_LOG.error("listen for service meta change failed!");        }    }    //......}
  • ServiceManager的init方法往UtilsAndCommons.SERVICE_UPDATE_EXECUTOR提交了UpdatedServiceProcessor任务

UpdatedServiceProcessor

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

    private class UpdatedServiceProcessor implements Runnable {        //get changed service from other server asynchronously        @Override        public void run() {            ServiceKey serviceKey = null;            try {                while (true) {                    try {                        serviceKey = toBeUpdatedServicesQueue.take();                    } catch (Exception e) {                        Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque.");                    }                    if (serviceKey == null) {                        continue;                    }                    GlobalExecutor.submitServiceUpdate(new ServiceUpdater(serviceKey));                }            } catch (Exception e) {                Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while update service: {}", serviceKey, e);            }        }    }
  • UpdatedServiceProcessor实现了Runnable方法,其run方法会不断循环从toBeUpdatedServicesQueue获取元素,然后使用GlobalExecutor.submitServiceUpdate提交ServiceUpdater

ServiceUpdater

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

    private class ServiceUpdater implements Runnable {        String namespaceId;        String serviceName;        String serverIP;        public ServiceUpdater(ServiceKey serviceKey) {            this.namespaceId = serviceKey.getNamespaceId();            this.serviceName = serviceKey.getServiceName();            this.serverIP = serviceKey.getServerIP();        }        @Override        public void run() {            try {                updatedHealthStatus(namespaceId, serviceName, serverIP);            } catch (Exception e) {                Loggers.SRV_LOG.warn("[DOMAIN-UPDATER] Exception while update service: {} from {}, error: {}",                    serviceName, serverIP, e);            }        }    }
  • ServiceUpdater实现了Runnable接口,其run方法执行的是updatedHealthStatus

ServiceManager.updatedHealthStatus

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@Component@DependsOn("nacosApplicationContext")public class ServiceManager implements RecordListener {    /**     * Map>     */    private Map> serviceMap = new ConcurrentHashMap<>();    private LinkedBlockingDeque toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);    private Synchronizer synchronizer = new ServiceStatusSynchronizer();    private final Lock lock = new ReentrantLock();    @Resource(name = "consistencyDelegate")    private ConsistencyService consistencyService;    @Autowired    private SwitchDomain switchDomain;    @Autowired    private DistroMapper distroMapper;    @Autowired    private ServerListManager serverListManager;    @Autowired    private PushService pushService;    private final Object putServiceLock = new Object();    //......    public void updatedHealthStatus(String namespaceId, String serviceName, String serverIP) {        Message msg = synchronizer.get(serverIP, UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));        JSONObject serviceJson = JSON.parseObject(msg.getData());        JSONArray ipList = serviceJson.getJSONArray("ips");        Map ipsMap = new HashMap<>(ipList.size());        for (int i = 0; i < ipList.size(); i++) {            String ip = ipList.getString(i);            String[] strings = ip.split("_");            ipsMap.put(strings[0], strings[1]);        }        Service service = getService(namespaceId, serviceName);        if (service == null) {            return;        }        boolean changed = false;        List instances = service.allIPs();        for (Instance instance : instances) {            boolean valid = Boolean.parseBoolean(ipsMap.get(instance.toIPAddr()));            if (valid != instance.isHealthy()) {                changed = true;                instance.setHealthy(valid);                Loggers.EVT_LOG.info("{} {SYNC} IP-{} : {}@{}{}",                    serviceName, (instance.isHealthy() ? "ENABLED" : "DISABLED"),                    instance.getIp(), instance.getPort(), instance.getClusterName());            }        }        if (changed) {            pushService.serviceChanged(service);        }        StringBuilder stringBuilder = new StringBuilder();        List allIps = service.allIPs();        for (Instance instance : allIps) {            stringBuilder.append(instance.toIPAddr()).append("_").append(instance.isHealthy()).append(",");        }        if (changed && Loggers.EVT_LOG.isDebugEnabled()) {            Loggers.EVT_LOG.debug("[HEALTH-STATUS-UPDATED] namespace: {}, service: {}, ips: {}",                service.getNamespaceId(), service.getName(), stringBuilder.toString());        }    }    //......}
  • updatedHealthStatus方法会从synchronizer获取msg,组装ipsMap,之后通过service.allIPs()获取instances信息,然后遍历instances从ipsMap获取实例的valid状态,如果与instance的isHealthy()对不上则标记为changed,更新instance的healthy;对于changed的则通过pushService.serviceChanged(service)发布事件,最后打印日志

小结

  • ServiceManager的init方法往UtilsAndCommons.SERVICE_UPDATE_EXECUTOR提交了UpdatedServiceProcessor任务

  • UpdatedServiceProcessor实现了Runnable方法,其run方法会不断循环从toBeUpdatedServicesQueue获取元素,然后使用GlobalExecutor.submitServiceUpdate提交ServiceUpdater

  • ServiceUpdater实现了Runnable接口,其run方法执行的是updatedHealthStatus

到此,相信大家对"nacos ServiceManager中UpdatedServiceProcessor的原理和作用是什么"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

方法 作用 原理 不断 任务 元素 内容 接口 学习 循环 实用 更深 事件 信息 兴趣 实例 实用性 实际 小结 操作简单 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 计算机网络技术 百度文档 数据库统计函数列表 福建海能软件开发有限公司职位 三支一扶期满后有没有服务器 电子公告eb和数据库db 安全 统一使用服务器共享 服务器安装云锁 投资者评估数据库 湖南特岗服务器 计算机系统与网络技术学士 服务器托管 业务下滑 辽宁软件开发公司有哪些 数据库ssms工具什么意思 苏州比较有名的网络安全公司 广州追光动力网络技术 招聘 服务器png图片 台式机用服务器内存效果怎样 数据库原理及应用 下载 网络安全教育个人心得体会 网络安全行业能做到多大 我的世界服务器多人生存第二期 鹏鹏网络技术服务有限公司 简述软件开发愿景 属于关系数据库中内模式的范畴 服务器主板u盘启动项 智联招聘高级软件开发工程师 java应用服务器开发 数据库管理员负责设计系统 湖南郴州学计算机软件开发招生 王者中怎样选取手q服务器
0