千家信息网

nacos ServiceManager的updateInstance有什么作用

发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,本篇内容介绍了"nacos ServiceManager的updateInstance有什么作用"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这
千家信息网最后更新 2025年02月04日nacos ServiceManager的updateInstance有什么作用

本篇内容介绍了"nacos ServiceManager的updateInstance有什么作用"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

本文主要研究一下nacos ServiceManager的updateInstance

ServiceManager

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@Component@DependsOn("nacosApplicationContext")public class ServiceManager implements RecordListener {    /**     * Map>     */    private Map> serviceMap = new ConcurrentHashMap<>();    private LinkedBlockingDeque toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);    private Synchronizer synchronizer = new ServiceStatusSynchronizer();    private final Lock lock = new ReentrantLock();    @Resource(name = "consistencyDelegate")    private ConsistencyService consistencyService;    @Autowired    private SwitchDomain switchDomain;    @Autowired    private DistroMapper distroMapper;    @Autowired    private ServerListManager serverListManager;    @Autowired    private PushService pushService;    private final Object putServiceLock = new Object();    //......    public void updateInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {        Service service = getService(namespaceId, serviceName);        if (service == null) {            throw new NacosException(NacosException.INVALID_PARAM,                "service not found, namespace: " + namespaceId + ", service: " + serviceName);        }        if (!service.allIPs().contains(instance)) {            throw new NacosException(NacosException.INVALID_PARAM, "instance not exist: " + instance);        }        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);    }    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);        Service service = getService(namespaceId, serviceName);        List instanceList = addIpAddresses(service, ephemeral, ips);        Instances instances = new Instances();        instances.setInstanceList(instanceList);        consistencyService.put(key, instances);    }    public List addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {        return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);    }    public List updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {        Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));        Map oldInstanceMap = new HashMap<>(16);        List currentIPs = service.allIPs(ephemeral);        Map map = new ConcurrentHashMap<>(currentIPs.size());        for (Instance instance : currentIPs) {            map.put(instance.toIPAddr(), instance);        }        if (datum != null) {            oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(), map);        }        // use HashMap for deep copy:        HashMap instanceMap = new HashMap<>(oldInstanceMap.size());        instanceMap.putAll(oldInstanceMap);        for (Instance instance : ips) {            if (!service.getClusterMap().containsKey(instance.getClusterName())) {                Cluster cluster = new Cluster(instance.getClusterName(), service);                cluster.init();                service.getClusterMap().put(instance.getClusterName(), cluster);                Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",                    instance.getClusterName(), instance.toJSON());            }            if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {                instanceMap.remove(instance.getDatumKey());            } else {                instanceMap.put(instance.getDatumKey(), instance);            }        }        if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {            throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: "                + JSON.toJSONString(instanceMap.values()));        }        return new ArrayList<>(instanceMap.values());    }    //...... }
  • updateInstance会通过service.allIPs().contains(instance)校验要更新的instance是否存在,不存在则抛出NacosException,存在则执行addInstance方法

  • addInstance方法它会获取service,然后执行addIpAddresses,最后执行consistencyService.put;addIpAddresses调用的是updateIpAddresses方法,其action参数为UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD

  • updateIpAddresses方法首先从consistencyService获取datum,然后通过service.allIPs方法获取currentIPs,之后根据datum设置oldInstanceMap,对于UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE类型执行删除,其余的action则将instance方法到instanceMap中

DistroConsistencyServiceImpl.put

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java

@org.springframework.stereotype.Service("distroConsistencyService")public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {    private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {        @Override        public Thread newThread(Runnable r) {            Thread t = new Thread(r);            t.setDaemon(true);            t.setName("com.alibaba.nacos.naming.distro.notifier");            return t;        }    });    @Autowired    private DistroMapper distroMapper;    @Autowired    private DataStore dataStore;    @Autowired    private TaskDispatcher taskDispatcher;    @Autowired    private DataSyncer dataSyncer;    @Autowired    private Serializer serializer;    @Autowired    private ServerListManager serverListManager;    @Autowired    private SwitchDomain switchDomain;    @Autowired    private GlobalConfig globalConfig;    private boolean initialized = false;    public volatile Notifier notifier = new Notifier();    private Map> listeners = new ConcurrentHashMap<>();    private Map syncChecksumTasks = new ConcurrentHashMap<>(16);    //......    public void put(String key, Record value) throws NacosException {        onPut(key, value);        taskDispatcher.addTask(key);    }    public void onPut(String key, Record value) {        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {            Datum datum = new Datum<>();            datum.value = (Instances) value;            datum.key = key;            datum.timestamp.incrementAndGet();            dataStore.put(key, datum);        }        if (!listeners.containsKey(key)) {            return;        }        notifier.addTask(key, ApplyAction.CHANGE);    }    //......}
  • DistroConsistencyServiceImpl的put方法会先执行onPut,然后执行taskDispatcher.addTask(key);onPut在判断key是ephemeralInstanceListKey时会创建一个Datum,递增其timestamp,然后放到dataStore中,最后调用notifier.addTask(key, ApplyAction.CHANGE)

Notifier.addTask

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java

    public class Notifier implements Runnable {        private ConcurrentHashMap services = new ConcurrentHashMap<>(10 * 1024);        private BlockingQueue tasks = new LinkedBlockingQueue(1024 * 1024);        public void addTask(String datumKey, ApplyAction action) {            if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {                return;            }            if (action == ApplyAction.CHANGE) {                services.put(datumKey, StringUtils.EMPTY);            }            tasks.add(Pair.with(datumKey, action));        }        public int getTaskSize() {            return tasks.size();        }        @Override        public void run() {            Loggers.DISTRO.info("distro notifier started");            while (true) {                try {                    Pair pair = tasks.take();                    if (pair == null) {                        continue;                    }                    String datumKey = (String) pair.getValue0();                    ApplyAction action = (ApplyAction) pair.getValue1();                    services.remove(datumKey);                    int count = 0;                    if (!listeners.containsKey(datumKey)) {                        continue;                    }                    for (RecordListener listener : listeners.get(datumKey)) {                        count++;                        try {                            if (action == ApplyAction.CHANGE) {                                listener.onChange(datumKey, dataStore.get(datumKey).value);                                continue;                            }                            if (action == ApplyAction.DELETE) {                                listener.onDelete(datumKey);                                continue;                            }                        } catch (Throwable e) {                            Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);                        }                    }                    if (Loggers.DISTRO.isDebugEnabled()) {                        Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",                            datumKey, count, action.name());                    }                } catch (Throwable e) {                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);                }            }        }    }
  • Notifier的addTask方法对于action为ApplyAction.CHANGE的且不在services当中的会放入到services当中,最后添加到tasks;run方法会不断从tasks取出数据,执行相应的回调

TaskDispatcher.addTask

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/TaskDispatcher.java

@Componentpublic class TaskDispatcher {    @Autowired    private GlobalConfig partitionConfig;    @Autowired    private DataSyncer dataSyncer;    private List taskSchedulerList = new ArrayList<>();    private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();    @PostConstruct    public void init() {        for (int i = 0; i < cpuCoreCount; i++) {            TaskScheduler taskScheduler = new TaskScheduler(i);            taskSchedulerList.add(taskScheduler);            GlobalExecutor.submitTaskDispatch(taskScheduler);        }    }    public void addTask(String key) {        taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);    }    public class TaskScheduler implements Runnable {        private int index;        private int dataSize = 0;        private long lastDispatchTime = 0L;        private BlockingQueue queue = new LinkedBlockingQueue<>(128 * 1024);        public TaskScheduler(int index) {            this.index = index;        }        public void addTask(String key) {            queue.offer(key);        }        public int getIndex() {            return index;        }        @Override        public void run() {            List keys = new ArrayList<>();            while (true) {                try {                    String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),                        TimeUnit.MILLISECONDS);                    if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {                        Loggers.DISTRO.debug("got key: {}", key);                    }                    if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {                        continue;                    }                    if (StringUtils.isBlank(key)) {                        continue;                    }                    if (dataSize == 0) {                        keys = new ArrayList<>();                    }                    keys.add(key);                    dataSize++;                    if (dataSize == partitionConfig.getBatchSyncKeyCount() ||                        (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {                        for (Server member : dataSyncer.getServers()) {                            if (NetUtils.localServer().equals(member.getKey())) {                                continue;                            }                            SyncTask syncTask = new SyncTask();                            syncTask.setKeys(keys);                            syncTask.setTargetServer(member.getKey());                            if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {                                Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));                            }                            dataSyncer.submit(syncTask, 0);                        }                        lastDispatchTime = System.currentTimeMillis();                        dataSize = 0;                    }                } catch (Exception e) {                    Loggers.DISTRO.error("dispatch sync task failed.", e);                }            }        }    }}
  • TaskDispatcher的addTask方法会从taskSchedulerList获取指定的TaskScheduler,然后执行其addTask方法;TaskScheduler的addTask方法会往queue中添加数据,而run方法则不断从queue取数据,然后通过dataSyncer执行syncTask

SyncTask

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/SyncTask.java

public class SyncTask {    private List keys;    private int retryCount;    private long lastExecuteTime;    private String targetServer;    public List getKeys() {        return keys;    }    public void setKeys(List keys) {        this.keys = keys;    }    public int getRetryCount() {        return retryCount;    }    public void setRetryCount(int retryCount) {        this.retryCount = retryCount;    }    public long getLastExecuteTime() {        return lastExecuteTime;    }    public void setLastExecuteTime(long lastExecuteTime) {        this.lastExecuteTime = lastExecuteTime;    }    public String getTargetServer() {        return targetServer;    }    public void setTargetServer(String targetServer) {        this.targetServer = targetServer;    }}
  • SyncTask包含了keys、targetServer属性,其中targetServer用于告诉DataSyncer该往哪个server执行sync操作

小结

  • updateInstance会通过service.allIPs().contains(instance)校验要更新的instance是否存在,不存在则抛出NacosException,存在则执行addInstance方法

  • addInstance方法它会获取service,然后执行addIpAddresses,最后执行consistencyService.put;addIpAddresses调用的是updateIpAddresses方法,其action参数为UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD

  • updateIpAddresses方法首先从consistencyService获取datum,然后通过service.allIPs方法获取currentIPs,之后根据datum设置oldInstanceMap,对于UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE类型执行删除,其余的action则将instance方法到instanceMap中

"nacos ServiceManager的updateInstance有什么作用"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0