千家信息网

nacos中notifyConfigInfo有什么用

发表于:2024-11-11 作者:千家信息网编辑
千家信息网最后更新 2024年11月11日,nacos中notifyConfigInfo有什么用,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。CommunicationContro
千家信息网最后更新 2024年11月11日nacos中notifyConfigInfo有什么用

nacos中notifyConfigInfo有什么用,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

CommunicationController

nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/controller/CommunicationController.java

@Controller@RequestMapping(Constants.COMMUNICATION_CONTROLLER_PATH)public class CommunicationController {    private final DumpService dumpService;    private final LongPollingService longPollingService;    private String trueStr = "true";    @Autowired    public CommunicationController(DumpService dumpService, LongPollingService longPollingService) {        this.dumpService = dumpService;        this.longPollingService = longPollingService;    }    /**     * 通知配置信息改变     */    @RequestMapping(value = "/dataChange", method = RequestMethod.GET)    @ResponseBody    public Boolean notifyConfigInfo(HttpServletRequest request, HttpServletResponse response,                                    @RequestParam("dataId") String dataId, @RequestParam("group") String group,                                    @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY)                                        String tenant,                                    @RequestParam(value = "tag", required = false) String tag) {        dataId = dataId.trim();        group = group.trim();        String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);        long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);        String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);        String isBetaStr = request.getHeader("isBeta");        if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {            dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);        } else {            dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);        }        return true;    }    //......}
  • notifyConfigInfo方法主要是执行dumpService.dump方法,只是是否beta调用的dump方法不同

DumpService

nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java

@Servicepublic class DumpService {    @Autowired    private Environment env;    @Autowired    PersistService persistService;    @PostConstruct    public void init() {        LogUtil.defaultLog.warn("DumpService start");        DumpProcessor processor = new DumpProcessor(this);        DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this);        DumpAllBetaProcessor dumpAllBetaProcessor = new DumpAllBetaProcessor(this);        DumpAllTagProcessor dumpAllTagProcessor = new DumpAllTagProcessor(this);        dumpTaskMgr = new TaskManager(            "com.alibaba.nacos.server.DumpTaskManager");        dumpTaskMgr.setDefaultTaskProcessor(processor);        dumpAllTaskMgr = new TaskManager(            "com.alibaba.nacos.server.DumpAllTaskManager");        dumpAllTaskMgr.setDefaultTaskProcessor(dumpAllProcessor);        //......    }        /**     * 全量dump间隔     */    static final int DUMP_ALL_INTERVAL_IN_MINUTE = 6 * 60;    /**     * 全量dump间隔     */    static final int INITIAL_DELAY_IN_MINUTE = 6 * 60;    private TaskManager dumpTaskMgr;    private TaskManager dumpAllTaskMgr;    private static final Logger log = LoggerFactory.getLogger(DumpService.class);    static final AtomicInteger FINISHED = new AtomicInteger();    static final int INIT_THREAD_COUNT = 10;    int total = 0;    private final static String TRUE_STR = "true";    private final static String BETA_TABLE_NAME = "config_info_beta";    private final static String TAG_TABLE_NAME = "config_info_tag";    Boolean isQuickStart = false;    private int retentionDays = 30;    //......    public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {        String groupKey = GroupKey2.getKey(dataId, group, tenant);        dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));    }    public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp) {        dump(dataId, group, tenant, tag, lastModified, handleIp, false);    }    public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,                     boolean isBeta) {        String groupKey = GroupKey2.getKey(dataId, group, tenant);        dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));    }    //......}
  • dump方法最后是往dumpTaskMgr添加DumpTask;dumpTaskMgr的defaultTaskProcessor为dumpProcessor

TaskManager

nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/manager/TaskManager.java

public final class TaskManager implements TaskManagerMBean {    private static final Logger log = LogUtil.defaultLog;    private final ConcurrentHashMap tasks = new ConcurrentHashMap();    private final ConcurrentHashMap taskProcessors =        new ConcurrentHashMap();    private TaskProcessor defaultTaskProcessor;    Thread processingThread;    private final AtomicBoolean closed = new AtomicBoolean(true);    private String name;    class ProcessRunnable implements Runnable {        @Override        public void run() {            while (!TaskManager.this.closed.get()) {                try {                    Thread.sleep(100);                    TaskManager.this.process();                } catch (Throwable e) {                }            }        }    }    ReentrantLock lock = new ReentrantLock();    Condition notEmpty = this.lock.newCondition();    public TaskManager() {        this(null);    }    public AbstractTask getTask(String type) {        return this.tasks.get(type);    }    public TaskProcessor getTaskProcessor(String type) {        return this.taskProcessors.get(type);    }    @SuppressWarnings("PMD.AvoidManuallyCreateThreadRule")    public TaskManager(String name) {        this.name = name;        if (null != name && name.length() > 0) {            this.processingThread = new Thread(new ProcessRunnable(), name);        } else {            this.processingThread = new Thread(new ProcessRunnable());        }        this.processingThread.setDaemon(true);        this.closed.set(false);        this.processingThread.start();    }    //......    /**     * 将任务加入到任务Map中     *     * @param type     * @param task     */    public void addTask(String type, AbstractTask task) {        this.lock.lock();        try {            AbstractTask oldTask = tasks.put(type, task);            MetricsMonitor.getDumpTaskMonitor().set(tasks.size());            if (null != oldTask) {                task.merge(oldTask);            }        } finally {            this.lock.unlock();        }    }    protected void process() {        for (Map.Entry entry : this.tasks.entrySet()) {            AbstractTask task = null;            this.lock.lock();            try {                // 获取任务                task = entry.getValue();                if (null != task) {                    if (!task.shouldProcess()) {                        // 任务当前不需要被执行,直接跳过                        continue;                    }                    // 先将任务从任务Map中删除                    this.tasks.remove(entry.getKey());                    MetricsMonitor.getDumpTaskMonitor().set(tasks.size());                }            } finally {                this.lock.unlock();            }            if (null != task) {                // 获取任务处理器                TaskProcessor processor = this.taskProcessors.get(entry.getKey());                if (null == processor) {                    // 如果没有根据任务类型设置的处理器,使用默认处理器                    processor = this.getDefaultTaskProcessor();                }                if (null != processor) {                    boolean result = false;                    try {                        // 处理任务                        result = processor.process(entry.getKey(), task);                    } catch (Throwable t) {                        log.error("task_fail", "处理task失败", t);                    }                    if (!result) {                        // 任务处理失败,设置最后处理时间                        task.setLastProcessTime(System.currentTimeMillis());                        // 将任务重新加入到任务Map中                        this.addTask(entry.getKey(), task);                    }                }            }        }        if (tasks.isEmpty()) {            this.lock.lock();            try {                this.notEmpty.signalAll();            } finally {                this.lock.unlock();            }        }    }    //......}
  • TaskManager的addTask方法往tasks添加AbstractTask;其构造器启动了ProcessRunnable,其run方法主要是执行TaskManager.this.process()方法;该方法会遍历tasks,取出任务,然后通过TaskProcessor的process方法来执行任务

DumpProcessor

nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpTask.java

class DumpProcessor implements TaskProcessor {    DumpProcessor(DumpService dumpService) {        this.dumpService = dumpService;    }    @Override    public boolean process(String taskType, AbstractTask task) {        DumpTask dumpTask = (DumpTask)task;        String[] pair = GroupKey2.parseKey(dumpTask.groupKey);        String dataId = pair[0];        String group = pair[1];        String tenant = pair[2];        long lastModified = dumpTask.lastModified;        String handleIp = dumpTask.handleIp;        boolean isBeta = dumpTask.isBeta;        String tag = dumpTask.tag;        if (isBeta) {            // beta发布,则dump数据,更新beta缓存            ConfigInfo4Beta cf = dumpService.persistService.findConfigInfo4Beta(dataId, group, tenant);            boolean result;            if (null != cf) {                result = ConfigService.dumpBeta(dataId, group, tenant, cf.getContent(), lastModified, cf.getBetaIps());                if (result) {                    ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,                        ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,                        cf.getContent().length());                }            } else {                result = ConfigService.removeBeta(dataId, group, tenant);                if (result) {                    ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,                        ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);                }            }            return result;        } else {            if (StringUtils.isBlank(tag)) {                ConfigInfo cf = dumpService.persistService.findConfigInfo(dataId, group, tenant);                if (dataId.equals(AggrWhitelist.AGGRIDS_METADATA)) {                    if (null != cf) {                        AggrWhitelist.load(cf.getContent());                    } else {                        AggrWhitelist.load(null);                    }                }                if (dataId.equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {                    if (null != cf) {                        ClientIpWhiteList.load(cf.getContent());                    } else {                        ClientIpWhiteList.load(null);                    }                }                if (dataId.equals(SwitchService.SWITCH_META_DATAID)) {                    if (null != cf) {                        SwitchService.load(cf.getContent());                    } else {                        SwitchService.load(null);                    }                }                boolean result;                if (null != cf) {                    result = ConfigService.dump(dataId, group, tenant, cf.getContent(), lastModified);                    if (result) {                        ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,                            ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,                            cf.getContent().length());                    }                } else {                    result = ConfigService.remove(dataId, group, tenant);                    if (result) {                        ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,                            ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);                    }                }                return result;            } else {                ConfigInfo4Tag cf = dumpService.persistService.findConfigInfo4Tag(dataId, group, tenant, tag);                //                boolean result;                if (null != cf) {                    result = ConfigService.dumpTag(dataId, group, tenant, tag, cf.getContent(), lastModified);                    if (result) {                        ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,                            ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,                            cf.getContent().length());                    }                } else {                    result = ConfigService.removeTag(dataId, group, tenant, tag);                    if (result) {                        ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,                            ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);                    }                }                return result;            }        }    }    final DumpService dumpService;}
  • DumpProcessor实现了TaskProcessor接口,其process方法主要是根据不同条件执行ConfigService.dump或者remove方法

看完上述内容,你们掌握nacos中notifyConfigInfo有什么用的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!

0