千家信息网

NacosNamingService中selectInstances的原理和作用

发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,本篇内容主要讲解"NacosNamingService中selectInstances的原理和作用",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Nacos
千家信息网最后更新 2025年02月02日NacosNamingService中selectInstances的原理和作用

本篇内容主要讲解"NacosNamingService中selectInstances的原理和作用",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"NacosNamingService中selectInstances的原理和作用"吧!

本文主要研究一下NacosNamingService的selectInstances

NacosNamingService

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java

public class NacosNamingService implements NamingService {    private static final String DEFAULT_PORT = "8080";    private static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);    /**     * Each Naming instance should have different namespace.     */    private String namespace;    private String endpoint;    private String serverList;    private String cacheDir;    private String logName;    private HostReactor hostReactor;    private BeatReactor beatReactor;    private EventDispatcher eventDispatcher;    private NamingProxy serverProxy;    //......    @Override    public List selectInstances(String serviceName, boolean healthy) throws NacosException {        return selectInstances(serviceName, new ArrayList(), healthy);    }    @Override    public List selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {        return selectInstances(serviceName, groupName, healthy, true);    }    @Override    public List selectInstances(String serviceName, boolean healthy, boolean subscribe)        throws NacosException {        return selectInstances(serviceName, new ArrayList(), healthy, subscribe);    }    @Override    public List selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe) throws NacosException {        return selectInstances(serviceName, groupName, new ArrayList(), healthy, subscribe);    }    @Override    public List selectInstances(String serviceName, List clusters, boolean healthy)        throws NacosException {        return selectInstances(serviceName, clusters, healthy, true);    }    @Override    public List selectInstances(String serviceName, String groupName, List clusters, boolean healthy) throws NacosException {        return selectInstances(serviceName, groupName, clusters, healthy, true);    }    @Override    public List selectInstances(String serviceName, List clusters, boolean healthy,                                          boolean subscribe) throws NacosException {        return selectInstances(serviceName, Constants.DEFAULT_GROUP, clusters, healthy, subscribe);    }    @Override    public List selectInstances(String serviceName, String groupName, List clusters, boolean healthy, boolean subscribe) throws NacosException {        ServiceInfo serviceInfo;        if (subscribe) {            serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));        } else {            serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));        }        return selectInstances(serviceInfo, healthy);    }    private List selectInstances(ServiceInfo serviceInfo, boolean healthy) {        List list;        if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {            return new ArrayList();        }        Iterator iterator = list.iterator();        while (iterator.hasNext()) {            Instance instance = iterator.next();            if (healthy != instance.isHealthy() || !instance.isEnabled() || instance.getWeight() <= 0) {                iterator.remove();            }        }        return list;    }    //......}
  • selectInstances首先从hostReactor获取serviceInfo,然后再从serviceInfo.getHosts()剔除非healty、非enabled、weight小于等于0的instance再返回;如果subscribe为true,则执行hostReactor.getServiceInfo获取serviceInfo,否则执行hostReactor.getServiceInfoDirectlyFromServer获取serviceInfo

HostReactor

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/core/HostReactor.java

public class HostReactor {    private static final long DEFAULT_DELAY = 1000L;    private static final long UPDATE_HOLD_INTERVAL = 5000L;    private final Map> futureMap = new HashMap>();    private Map serviceInfoMap;    private Map updatingMap;    private PushReceiver pushReceiver;    private EventDispatcher eventDispatcher;    private NamingProxy serverProxy;    private FailoverReactor failoverReactor;    private String cacheDir;    private ScheduledExecutorService executor;    //......    public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());        String key = ServiceInfo.getKey(serviceName, clusters);        if (failoverReactor.isFailoverSwitch()) {            return failoverReactor.getService(key);        }        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);        if (null == serviceObj) {            serviceObj = new ServiceInfo(serviceName, clusters);            serviceInfoMap.put(serviceObj.getKey(), serviceObj);            updatingMap.put(serviceName, new Object());            updateServiceNow(serviceName, clusters);            updatingMap.remove(serviceName);        } else if (updatingMap.containsKey(serviceName)) {            if (UPDATE_HOLD_INTERVAL > 0) {                // hold a moment waiting for update finish                synchronized (serviceObj) {                    try {                        serviceObj.wait(UPDATE_HOLD_INTERVAL);                    } catch (InterruptedException e) {                        NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);                    }                }            }        }        scheduleUpdateIfAbsent(serviceName, clusters);        return serviceInfoMap.get(serviceObj.getKey());    }    private ServiceInfo getServiceInfo0(String serviceName, String clusters) {        String key = ServiceInfo.getKey(serviceName, clusters);        return serviceInfoMap.get(key);    }    public void updateServiceNow(String serviceName, String clusters) {        ServiceInfo oldService = getServiceInfo0(serviceName, clusters);        try {            String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);            if (StringUtils.isNotEmpty(result)) {                processServiceJSON(result);            }        } catch (Exception e) {            NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);        } finally {            if (oldService != null) {                synchronized (oldService) {                    oldService.notifyAll();                }            }        }    }    public void scheduleUpdateIfAbsent(String serviceName, String clusters) {        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {            return;        }        synchronized (futureMap) {            if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {                return;            }            ScheduledFuture future = addTask(new UpdateTask(serviceName, clusters));            futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);        }    }    public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters) throws NacosException {        String result = serverProxy.queryList(serviceName, clusters, 0, false);        if (StringUtils.isNotEmpty(result)) {            return JSON.parseObject(result, ServiceInfo.class);        }        return null;    }    //......}
  • getServiceInfo首先判断failoverReactor.isFailoverSwitch(),如果是则返回failoverReactor.getService(key);接着通过getServiceInfo0从serviceInfoMap查找,如果找不到则创建一个新的然后放入serviceInfoMap,同时放入updatingMap,执行updateServiceNow,再从updatingMap移除;如果从serviceInfoMap找出来的serviceObj在updatingMap中则等待UPDATE_HOLD_INTERVAL;最后执行scheduleUpdateIfAbsent,再从serviceInfoMap取出serviceInfo

  • updateServiceNow则从serverProxy.queryList获取结果,然后通过processServiceJSON解析并根据需要更新serviceInfoMap;scheduleUpdateIfAbsent方法判断futureMap是否有该任务,如果没有则添加一个UpdateTask

  • getServiceInfoDirectlyFromServer方法则直接请求serverProxy.queryList获取ServiceInfo

UpdateTask

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/core/HostReactor.java

    public class UpdateTask implements Runnable {        long lastRefTime = Long.MAX_VALUE;        private String clusters;        private String serviceName;        public UpdateTask(String serviceName, String clusters) {            this.serviceName = serviceName;            this.clusters = clusters;        }        @Override        public void run() {            try {                ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));                if (serviceObj == null) {                    updateServiceNow(serviceName, clusters);                    executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);                    return;                }                if (serviceObj.getLastRefTime() <= lastRefTime) {                    updateServiceNow(serviceName, clusters);                    serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));                } else {                    // if serviceName already updated by push, we should not override it                    // since the push data may be different from pull through force push                    refreshOnly(serviceName, clusters);                }                executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);                lastRefTime = serviceObj.getLastRefTime();            } catch (Throwable e) {                NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);            }        }    }    public void refreshOnly(String serviceName, String clusters) {        try {            serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);        } catch (Exception e) {            NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);        }    }
  • UpdateTask实现了Runnable接口,其run方法首先从serviceInfoMap获取serviceObj,获取不到则执行updateServiceNow,然后再次延时调度UpdateTask;可以从serviceInfoMap获取serviceObj的话则判断serviceObj.getLastRefTime()是否小于等于lastRefTime,是的话则执行updateServiceNow,否则执行refreshOnly;最后再次延时调度UpdateTask,并更新lastRefTime

NamingProxy

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/net/NamingProxy.java

public class NamingProxy {    private static final int DEFAULT_SERVER_PORT = 8848;    private int serverPort = DEFAULT_SERVER_PORT;    private String namespaceId;    private String endpoint;    private String nacosDomain;    private List serverList;    private List serversFromEndpoint = new ArrayList();    private long lastSrvRefTime = 0L;    private long vipSrvRefInterMillis = TimeUnit.SECONDS.toMillis(30);    private Properties properties;    //......    public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)        throws NacosException {        final Map params = new HashMap(8);        params.put(CommonParams.NAMESPACE_ID, namespaceId);        params.put(CommonParams.SERVICE_NAME, serviceName);        params.put("clusters", clusters);        params.put("udpPort", String.valueOf(udpPort));        params.put("clientIP", NetUtils.localIP());        params.put("healthyOnly", String.valueOf(healthyOnly));        return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);    }    //......}
  • queryList方法会往/instance/list接口发送GET请求查询服务实例列表

小结

selectInstances首先从hostReactor获取serviceInfo,然后再从serviceInfo.getHosts()剔除非healty、非enabled、weight小于等于0的instance再返回;如果subscribe为true,则执行hostReactor.getServiceInfo获取serviceInfo,否则执行hostReactor.getServiceInfoDirectlyFromServer获取serviceInfo

到此,相信大家对"NacosNamingService中selectInstances的原理和作用"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0