千家信息网

NacosNamingService中subscribe及unsubscribe的原理和使用方法

发表于:2025-01-25 作者:千家信息网编辑
千家信息网最后更新 2025年01月25日,这篇文章主要讲解了"NacosNamingService中subscribe及unsubscribe的原理和使用方法",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来
千家信息网最后更新 2025年01月25日NacosNamingService中subscribe及unsubscribe的原理和使用方法

这篇文章主要讲解了"NacosNamingService中subscribe及unsubscribe的原理和使用方法",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"NacosNamingService中subscribe及unsubscribe的原理和使用方法"吧!

本文主要研究一下NacosNamingService的subscribe及unsubscribe

NacosNamingService

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java

public class NacosNamingService implements NamingService {    private static final String DEFAULT_PORT = "8080";    private static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);    /**     * Each Naming instance should have different namespace.     */    private String namespace;    private String endpoint;    private String serverList;    private String cacheDir;    private String logName;    private HostReactor hostReactor;    private BeatReactor beatReactor;    private EventDispatcher eventDispatcher;    private NamingProxy serverProxy;    //......    @Override    public void subscribe(String serviceName, EventListener listener) throws NacosException {        subscribe(serviceName, new ArrayList(), listener);    }    @Override    public void subscribe(String serviceName, String groupName, EventListener listener) throws NacosException {        subscribe(serviceName, groupName, new ArrayList(), listener);    }    @Override    public void subscribe(String serviceName, List clusters, EventListener listener) throws NacosException {        subscribe(serviceName, Constants.DEFAULT_GROUP, clusters, listener);    }    @Override    public void subscribe(String serviceName, String groupName, List clusters, EventListener listener) throws NacosException {        eventDispatcher.addListener(hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),            StringUtils.join(clusters, ",")), StringUtils.join(clusters, ","), listener);    }    @Override    public void unsubscribe(String serviceName, EventListener listener) throws NacosException {        unsubscribe(serviceName, new ArrayList(), listener);    }    @Override    public void unsubscribe(String serviceName, String groupName, EventListener listener) throws NacosException {        unsubscribe(serviceName, groupName, new ArrayList(), listener);    }    @Override    public void unsubscribe(String serviceName, List clusters, EventListener listener) throws NacosException {        unsubscribe(serviceName, Constants.DEFAULT_GROUP, clusters, listener);    }    @Override    public void unsubscribe(String serviceName, String groupName, List clusters, EventListener listener) throws NacosException {        eventDispatcher.removeListener(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","), listener);    }    //......}
  • subscribe方法执行eventDispatcher.addListener;unsubscribe方法执行eventDispatcher.removeListener

EventDispatcher

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/core/EventDispatcher.java

public class EventDispatcher {    private ExecutorService executor = null;    private BlockingQueue changedServices = new LinkedBlockingQueue();    private ConcurrentMap> observerMap        = new ConcurrentHashMap>();    public EventDispatcher() {        executor = Executors.newSingleThreadExecutor(new ThreadFactory() {            @Override            public Thread newThread(Runnable r) {                Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener");                thread.setDaemon(true);                return thread;            }        });        executor.execute(new Notifier());    }    public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) {        NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map");        List observers = Collections.synchronizedList(new ArrayList());        observers.add(listener);        observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers);        if (observers != null) {            observers.add(listener);        }        serviceChanged(serviceInfo);    }    public void removeListener(String serviceName, String clusters, EventListener listener) {        NAMING_LOGGER.info("[LISTENER] removing " + serviceName + " with " + clusters + " from listener map");        List observers = observerMap.get(ServiceInfo.getKey(serviceName, clusters));        if (observers != null) {            Iterator iter = observers.iterator();            while (iter.hasNext()) {                EventListener oldListener = iter.next();                if (oldListener.equals(listener)) {                    iter.remove();                }            }            if (observers.isEmpty()) {                observerMap.remove(ServiceInfo.getKey(serviceName, clusters));            }        }    }    public List getSubscribeServices() {        List serviceInfos = new ArrayList();        for (String key : observerMap.keySet()) {            serviceInfos.add(ServiceInfo.fromKey(key));        }        return serviceInfos;    }    public void serviceChanged(ServiceInfo serviceInfo) {        if (serviceInfo == null) {            return;        }        changedServices.add(serviceInfo);    }    private class Notifier implements Runnable {        @Override        public void run() {            while (true) {                ServiceInfo serviceInfo = null;                try {                    serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);                } catch (Exception ignore) {                }                if (serviceInfo == null) {                    continue;                }                try {                    List listeners = observerMap.get(serviceInfo.getKey());                    if (!CollectionUtils.isEmpty(listeners)) {                        for (EventListener listener : listeners) {                            List hosts = Collections.unmodifiableList(serviceInfo.getHosts());                            listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), hosts));                        }                    }                } catch (Exception e) {                    NAMING_LOGGER.error("[NA] notify error for service: "                        + serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e);                }            }        }    }    public void setExecutor(ExecutorService executor) {        ExecutorService oldExecutor = this.executor;        this.executor = executor;        oldExecutor.shutdown();    }}
  • EventDispatcher的构造器创建了executor,并执行Notifier;Notifier使用一个while true循环不断执行changedServices.poll(5, TimeUnit.MINUTES)拉取serviceInfo,拉取到的话会从observerMap取出对应的EventListener列表,然后挨个回调listener.onEvent方法

  • addListener方法则是往observerMap创建或添加observers,然后执行serviceChanged方法;removeListener则是从observerMap移除指定的listener,如果指定key的listener列表为空则删除该key

  • serviceChanged方法会往changedServices添加serviceInfo;之后Notifier异步线程可用拉取信息执行listener.onEvent回调

小结

NacosNamingService的subscribe方法执行eventDispatcher.addListener;unsubscribe方法执行eventDispatcher.removeListener

感谢各位的阅读,以上就是"NacosNamingService中subscribe及unsubscribe的原理和使用方法"的内容了,经过本文的学习后,相信大家对NacosNamingService中subscribe及unsubscribe的原理和使用方法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0