千家信息网

nacos中ServiceReporter的作用是什么

发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,这期内容当中小编将会给大家带来有关nacos中ServiceReporter的作用是什么,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。ServiceManager.
千家信息网最后更新 2025年02月04日nacos中ServiceReporter的作用是什么

这期内容当中小编将会给大家带来有关nacos中ServiceReporter的作用是什么,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

ServiceManager.init

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@Component@DependsOn("nacosApplicationContext")public class ServiceManager implements RecordListener {    /**     * Map>     */    private Map> serviceMap = new ConcurrentHashMap<>();    private LinkedBlockingDeque toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);    private Synchronizer synchronizer = new ServiceStatusSynchronizer();    private final Lock lock = new ReentrantLock();    @Resource(name = "consistencyDelegate")    private ConsistencyService consistencyService;    @Autowired    private SwitchDomain switchDomain;    @Autowired    private DistroMapper distroMapper;    @Autowired    private ServerListManager serverListManager;    @Autowired    private PushService pushService;    private final Object putServiceLock = new Object();    @PostConstruct    public void init() {        UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);        UtilsAndCommons.SERVICE_UPDATE_EXECUTOR.submit(new UpdatedServiceProcessor());        try {            Loggers.SRV_LOG.info("listen for service meta change");            consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);        } catch (NacosException e) {            Loggers.SRV_LOG.error("listen for service meta change failed!");        }    }    //......}
  • ServiceManager的init方法往UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR注册了ServiceReporter

ServiceReporter

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

    private class ServiceReporter implements Runnable {        @Override        public void run() {            try {                Map> allServiceNames = getAllServiceNames();                if (allServiceNames.size() <= 0) {                    //ignore                    return;                }                for (String namespaceId : allServiceNames.keySet()) {                    ServiceChecksum checksum = new ServiceChecksum(namespaceId);                    for (String serviceName : allServiceNames.get(namespaceId)) {                        if (!distroMapper.responsible(serviceName)) {                            continue;                        }                        Service service = getService(namespaceId, serviceName);                        if (service == null) {                            continue;                        }                        service.recalculateChecksum();                        checksum.addItem(serviceName, service.getChecksum());                    }                    Message msg = new Message();                    msg.setData(JSON.toJSONString(checksum));                    List sameSiteServers = serverListManager.getServers();                    if (sameSiteServers == null || sameSiteServers.size() <= 0) {                        return;                    }                    for (Server server : sameSiteServers) {                        if (server.getKey().equals(NetUtils.localServer())) {                            continue;                        }                        synchronizer.send(server.getKey(), msg);                    }                }            } catch (Exception e) {                Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status", e);            } finally {                UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(), TimeUnit.MILLISECONDS);            }        }    }
  • ServiceReporter实现Runnable接口,其run方法会遍历allServiceNames,取出distroMapper.responsible的serviceName,重新计算recalculateChecksum,然后添加到ServiceChecksum中,构造Message,遍历sameSiteServers使用synchronizer.send发送该消息;最后往UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR重新注册ServiceReporter

ServiceStatusSynchronizer

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/misc/ServiceStatusSynchronizer.java

public class ServiceStatusSynchronizer implements Synchronizer {    @Override    public void send(final String serverIP, Message msg) {        if(serverIP == null) {            return;        }        Map params = new HashMap(10);        params.put("statuses", msg.getData());        params.put("clientIP", NetUtils.localServer());        String url = "http://" + serverIP + ":" + RunningConfig.getServerPort() + RunningConfig.getContextPath() +                UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";        if (serverIP.contains(UtilsAndCommons.IP_PORT_SPLITER)) {            url = "http://" + serverIP + RunningConfig.getContextPath() +                    UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";        }        try {            HttpClient.asyncHttpPostLarge(url, null, JSON.toJSONString(params), new AsyncCompletionHandler() {                @Override                public Integer onCompleted(Response response) throws Exception {                    if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {                        Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: {}", serverIP);                        return 1;                    }                    return 0;                }            });        } catch (Exception e) {            Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, e);        }    }    @Override    public Message get(String serverIP, String key) {        if(serverIP == null) {            return null;        }        Map params = new HashMap<>(10);        params.put("key", key);        String result;        try {            if (Loggers.SRV_LOG.isDebugEnabled()) {                Loggers.SRV_LOG.debug("[STATUS-SYNCHRONIZE] sync service status from: {}, service: {}", serverIP, key);            }            result = NamingProxy.reqAPI(RunningConfig.getContextPath()                + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance/" + "statuses", params, serverIP);        } catch (Exception e) {            Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] Failed to get service status from " + serverIP, e);            return null;        }        if(result == null || result.equals(StringUtils.EMPTY)) {            return null;        }        Message msg = new Message();        msg.setData(result);        return msg;    }}
  • ServiceStatusSynchronizer实现了Synchronizer接口,其send方法会异步执行post请求,将statuses通知到目标server

小结

ServiceReporter实现Runnable接口,其run方法会遍历allServiceNames,取出distroMapper.responsible的serviceName,重新计算recalculateChecksum,然后添加到ServiceChecksum中,构造Message,遍历sameSiteServers使用synchronizer.send发送该消息;最后往UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR重新注册ServiceReporter

上述就是小编为大家分享的nacos中ServiceReporter的作用是什么了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。

0