千家信息网

nacos中DistroConsistencyServiceImpl的原理和作用是什么

发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,本篇内容介绍了"nacos中DistroConsistencyServiceImpl的原理和作用是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何
千家信息网最后更新 2025年01月31日nacos中DistroConsistencyServiceImpl的原理和作用是什么

本篇内容介绍了"nacos中DistroConsistencyServiceImpl的原理和作用是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

本文主要研究一下nacos的DistroConsistencyServiceImpl

ConsistencyService

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ConsistencyService.java

public interface ConsistencyService {    /**     * Put a data related to a key to Nacos cluster     *     * @param key   key of data, this key should be globally unique     * @param value value of data     * @throws NacosException     * @see     */    void put(String key, Record value) throws NacosException;    /**     * Remove a data from Nacos cluster     *     * @param key key of data     * @throws NacosException     */    void remove(String key) throws NacosException;    /**     * Get a data from Nacos cluster     *     * @param key key of data     * @return data related to the key     * @throws NacosException     */    Datum get(String key) throws NacosException;    /**     * Listen for changes of a data     *     * @param key      key of data     * @param listener callback of data change     * @throws NacosException     */    void listen(String key, RecordListener listener) throws NacosException;    /**     * Cancel listening of a data     *     * @param key      key of data     * @param listener callback of data change     * @throws NacosException     */    void unlisten(String key, RecordListener listener) throws NacosException;    /**     * Tell the status of this consistency service     *     * @return true if available     */    boolean isAvailable();}
  • ConsistencyService定义了put、remove、get、listen、unlisten、isAvailable方法

EphemeralConsistencyService

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/EphemeralConsistencyService.java

public interface EphemeralConsistencyService extends ConsistencyService {}
  • EphemeralConsistencyService接口继承了ConsistencyService接口

DistroConsistencyServiceImpl

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java

@org.springframework.stereotype.Service("distroConsistencyService")public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {    private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {        @Override        public Thread newThread(Runnable r) {            Thread t = new Thread(r);            t.setDaemon(true);            t.setName("com.alibaba.nacos.naming.distro.notifier");            return t;        }    });    @Autowired    private DistroMapper distroMapper;    @Autowired    private DataStore dataStore;    @Autowired    private TaskDispatcher taskDispatcher;    @Autowired    private DataSyncer dataSyncer;    @Autowired    private Serializer serializer;    @Autowired    private ServerListManager serverListManager;    @Autowired    private SwitchDomain switchDomain;    @Autowired    private GlobalConfig globalConfig;    private boolean initialized = false;    public volatile Notifier notifier = new Notifier();    private Map> listeners = new ConcurrentHashMap<>();    private Map syncChecksumTasks = new ConcurrentHashMap<>(16);    @PostConstruct    public void init() {        GlobalExecutor.submit(new Runnable() {            @Override            public void run() {                try {                    load();                } catch (Exception e) {                    Loggers.DISTRO.error("load data failed.", e);                }            }        });        executor.submit(notifier);    }    public void load() throws Exception {        if (SystemUtils.STANDALONE_MODE) {            initialized = true;            return;        }        // size = 1 means only myself in the list, we need at least one another server alive:        while (serverListManager.getHealthyServers().size() <= 1) {            Thread.sleep(1000L);            Loggers.DISTRO.info("waiting server list init...");        }        for (Server server : serverListManager.getHealthyServers()) {            if (NetUtils.localServer().equals(server.getKey())) {                continue;            }            if (Loggers.DISTRO.isDebugEnabled()) {                Loggers.DISTRO.debug("sync from " + server);            }            // try sync data from remote server:            if (syncAllDataFromRemote(server)) {                initialized = true;                return;            }        }    }    //......    public boolean syncAllDataFromRemote(Server server) {        try {            byte[] data = NamingProxy.getAllData(server.getKey());            processData(data);            return true;        } catch (Exception e) {            Loggers.DISTRO.error("sync full data from " + server + " failed!", e);            return false;        }    }    public void processData(byte[] data) throws Exception {        if (data.length > 0) {            Map> datumMap =                serializer.deserializeMap(data, Instances.class);            for (Map.Entry> entry : datumMap.entrySet()) {                dataStore.put(entry.getKey(), entry.getValue());                if (!listeners.containsKey(entry.getKey())) {                    // pretty sure the service not exist:                    if (switchDomain.isDefaultInstanceEphemeral()) {                        // create empty service                        Loggers.DISTRO.info("creating service {}", entry.getKey());                        Service service = new Service();                        String serviceName = KeyBuilder.getServiceName(entry.getKey());                        String namespaceId = KeyBuilder.getNamespace(entry.getKey());                        service.setName(serviceName);                        service.setNamespaceId(namespaceId);                        service.setGroupName(Constants.DEFAULT_GROUP);                        // now validate the service. if failed, exception will be thrown                        service.setLastModifiedMillis(System.currentTimeMillis());                        service.recalculateChecksum();                        listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0)                            .onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);                    }                }            }            for (Map.Entry> entry : datumMap.entrySet()) {                if (!listeners.containsKey(entry.getKey())) {                    // Should not happen:                    Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());                    continue;                }                try {                    for (RecordListener listener : listeners.get(entry.getKey())) {                        listener.onChange(entry.getKey(), entry.getValue().value);                    }                } catch (Exception e) {                    Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);                    continue;                }                // Update data store if listener executed successfully:                dataStore.put(entry.getKey(), entry.getValue());            }        }    }    //......    @Override    public void put(String key, Record value) throws NacosException {        onPut(key, value);        taskDispatcher.addTask(key);    }    @Override    public void remove(String key) throws NacosException {        onRemove(key);        listeners.remove(key);    }    @Override    public Datum get(String key) throws NacosException {        return dataStore.get(key);    }    //......    @Override    public void listen(String key, RecordListener listener) throws NacosException {        if (!listeners.containsKey(key)) {            listeners.put(key, new CopyOnWriteArrayList<>());        }        if (listeners.get(key).contains(listener)) {            return;        }        listeners.get(key).add(listener);    }    @Override    public void unlisten(String key, RecordListener listener) throws NacosException {        if (!listeners.containsKey(key)) {            return;        }        for (RecordListener recordListener : listeners.get(key)) {            if (recordListener.equals(listener)) {                listeners.get(key).remove(listener);                break;            }        }    }    @Override    public boolean isAvailable() {        return isInitialized() || ServerStatus.UP.name().equals(switchDomain.getOverriddenServerStatus());    }    //......}
  • DistroConsistencyServiceImpl实现了EphemeralConsistencyService接口

  • 其init方法会异步执行load方法,该方法会执行syncAllDataFromRemote进行初始化,该方法会通过NamingProxy.getAllData获取data,然后执行processData,它主要是执行回调然后往dataStore添加数据;init方法最后会异步执行Notifier

  • 其put方法会执行onPut方法及taskDispatcher.addTask(key);其remove方法会执行onRemove方法即listeners.remove(key);其get方法直接从dataStore读取;其listen会添加RecordListener;其unlisten则会移除RecordListener;其isAvailable会通过isInitialized及ServerStatus.UP状态来判断

Notifier

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java

    public class Notifier implements Runnable {        private ConcurrentHashMap services = new ConcurrentHashMap<>(10 * 1024);        private BlockingQueue tasks = new LinkedBlockingQueue(1024 * 1024);        public void addTask(String datumKey, ApplyAction action) {            if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {                return;            }            if (action == ApplyAction.CHANGE) {                services.put(datumKey, StringUtils.EMPTY);            }            tasks.add(Pair.with(datumKey, action));        }        public int getTaskSize() {            return tasks.size();        }        @Override        public void run() {            Loggers.DISTRO.info("distro notifier started");            while (true) {                try {                    Pair pair = tasks.take();                    if (pair == null) {                        continue;                    }                    String datumKey = (String) pair.getValue0();                    ApplyAction action = (ApplyAction) pair.getValue1();                    services.remove(datumKey);                    int count = 0;                    if (!listeners.containsKey(datumKey)) {                        continue;                    }                    for (RecordListener listener : listeners.get(datumKey)) {                        count++;                        try {                            if (action == ApplyAction.CHANGE) {                                listener.onChange(datumKey, dataStore.get(datumKey).value);                                continue;                            }                            if (action == ApplyAction.DELETE) {                                listener.onDelete(datumKey);                                continue;                            }                        } catch (Throwable e) {                            Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);                        }                    }                    if (Loggers.DISTRO.isDebugEnabled()) {                        Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",                            datumKey, count, action.name());                    }                } catch (Throwable e) {                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);                }            }        }    }
  • Notifier实现了Runnable接口,其run方法会从LinkedBlockingQueue取task,然后挨个执行listener回调

小结

  • DistroConsistencyServiceImpl实现了EphemeralConsistencyService接口

  • 其init方法会异步执行load方法,该方法会执行syncAllDataFromRemote进行初始化,该方法会通过NamingProxy.getAllData获取data,然后执行processData,它主要是执行回调然后往dataStore添加数据;init方法最后会异步执行Notifier

  • 其put方法会执行onPut方法及taskDispatcher.addTask(key);其remove方法会执行onRemove方法即listeners.remove(key);其get方法直接从dataStore读取;其listen会添加RecordListener;其unlisten则会移除RecordListener;其isAvailable会通过isInitialized及ServerStatus.UP状态来判断

"nacos中DistroConsistencyServiceImpl的原理和作用是什么"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0