千家信息网

nacos中DataSyncer的作用是什么

发表于:2024-12-13 作者:千家信息网编辑
千家信息网最后更新 2024年12月13日,本篇文章为大家展示了nacos中DataSyncer的作用是什么,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。DataSyncernacos-1.1.3/na
千家信息网最后更新 2024年12月13日nacos中DataSyncer的作用是什么

本篇文章为大家展示了nacos中DataSyncer的作用是什么,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

DataSyncer

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DataSyncer.java

@Component@DependsOn("serverListManager")public class DataSyncer {    @Autowired    private DataStore dataStore;    @Autowired    private GlobalConfig partitionConfig;    @Autowired    private Serializer serializer;    @Autowired    private DistroMapper distroMapper;    @Autowired    private ServerListManager serverListManager;    private Map taskMap = new ConcurrentHashMap<>();    @PostConstruct    public void init() {        startTimedSync();    }    public void submit(SyncTask task, long delay) {        // If it's a new task:        if (task.getRetryCount() == 0) {            Iterator iterator = task.getKeys().iterator();            while (iterator.hasNext()) {                String key = iterator.next();                if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) {                    // associated key already exist:                    if (Loggers.DISTRO.isDebugEnabled()) {                        Loggers.DISTRO.debug("sync already in process, key: {}", key);                    }                    iterator.remove();                }            }        }        if (task.getKeys().isEmpty()) {            // all keys are removed:            return;        }        GlobalExecutor.submitDataSync(new Runnable() {            @Override            public void run() {                try {                    if (getServers() == null || getServers().isEmpty()) {                        Loggers.SRV_LOG.warn("try to sync data but server list is empty.");                        return;                    }                    List keys = task.getKeys();                    if (Loggers.DISTRO.isDebugEnabled()) {                        Loggers.DISTRO.debug("sync keys: {}", keys);                    }                    Map datumMap = dataStore.batchGet(keys);                    if (datumMap == null || datumMap.isEmpty()) {                        // clear all flags of this task:                        for (String key : task.getKeys()) {                            taskMap.remove(buildKey(key, task.getTargetServer()));                        }                        return;                    }                    byte[] data = serializer.serialize(datumMap);                    long timestamp = System.currentTimeMillis();                    boolean success = NamingProxy.syncData(data, task.getTargetServer());                    if (!success) {                        SyncTask syncTask = new SyncTask();                        syncTask.setKeys(task.getKeys());                        syncTask.setRetryCount(task.getRetryCount() + 1);                        syncTask.setLastExecuteTime(timestamp);                        syncTask.setTargetServer(task.getTargetServer());                        retrySync(syncTask);                    } else {                        // clear all flags of this task:                        for (String key : task.getKeys()) {                            taskMap.remove(buildKey(key, task.getTargetServer()));                        }                    }                } catch (Exception e) {                    Loggers.DISTRO.error("sync data failed.", e);                }            }        }, delay);    }    public void retrySync(SyncTask syncTask) {        Server server = new Server();        server.setIp(syncTask.getTargetServer().split(":")[0]);        server.setServePort(Integer.parseInt(syncTask.getTargetServer().split(":")[1]));        if (!getServers().contains(server)) {            // if server is no longer in healthy server list, ignore this task:            return;        }        // TODO may choose other retry policy.        submit(syncTask, partitionConfig.getSyncRetryDelay());    }    public void startTimedSync() {        GlobalExecutor.schedulePartitionDataTimedSync(new TimedSync());    }    //......    public List getServers() {        return serverListManager.getHealthyServers();    }    public String buildKey(String key, String targetServer) {        return key + UtilsAndCommons.CACHE_KEY_SPLITER + targetServer;    }}
  • DataSyncer定义了submit、retrySync、startTimedSync、getServers等方法,其init方法会执行startTimedSync

  • submit方法对于retryCount为0的任务会判断taskMap是否存在该任务如果存在则移除其taskKey,之后使用GlobalExecutor.submitDataSync提交一个sync任务,它主要是通过NamingProxy.syncData来同步,成功则移除,不成功则使用retrySync重试

  • retrySync则重新构建server调用submit执行;startTimedSync方法则是使用GlobalExecutor.schedulePartitionDataTimedSync提交TimedSync任务;getServers则通过serverListManager.getHealthyServers()返回健康的实例

TimedSync

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DataSyncer.java

    public class TimedSync implements Runnable {        @Override        public void run() {            try {                if (Loggers.DISTRO.isDebugEnabled()) {                    Loggers.DISTRO.debug("server list is: {}", getServers());                }                // send local timestamps to other servers:                Map keyChecksums = new HashMap<>(64);                for (String key : dataStore.keys()) {                    if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {                        continue;                    }                    keyChecksums.put(key, dataStore.get(key).value.getChecksum());                }                if (keyChecksums.isEmpty()) {                    return;                }                if (Loggers.DISTRO.isDebugEnabled()) {                    Loggers.DISTRO.debug("sync checksums: {}", keyChecksums);                }                for (Server member : getServers()) {                    if (NetUtils.localServer().equals(member.getKey())) {                        continue;                    }                    NamingProxy.syncCheckSums(keyChecksums, member.getKey());                }            } catch (Exception e) {                Loggers.DISTRO.error("timed sync task failed.", e);            }        }    }
  • TimedSync会使用NamingProxy.syncCheckSums同步keyChecksums进行校验

小结

  • DataSyncer定义了submit、retrySync、startTimedSync、getServers等方法,其init方法会执行startTimedSync

  • submit方法对于retryCount为0的任务会判断taskMap是否存在该任务如果存在则移除其taskKey,之后使用GlobalExecutor.submitDataSync提交一个sync任务,它主要是通过NamingProxy.syncData来同步,成功则移除,不成功则使用retrySync重试

  • retrySync则重新构建server调用submit执行;startTimedSync方法则是使用GlobalExecutor.schedulePartitionDataTimedSync提交TimedSync任务;getServers则通过serverListManager.getHealthyServers()返回健康的实例

上述内容就是nacos中DataSyncer的作用是什么,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注行业资讯频道。

0